Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/configure.ac b/configure.ac
index af9b3b1d..bbd127c8 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1,399 +1,398 @@
#
# Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
#
# Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
# Federico Simoncelli <fsimon@kronosnet.org>
#
# This software licensed under GPL-2.0+, LGPL-2.0+
#
# -*- Autoconf -*-
# Process this file with autoconf to produce a configure script.
#
AC_PREREQ([2.63])
AC_INIT([kronosnet],
m4_esyscmd([build-aux/git-version-gen .tarball-version]),
[devel@lists.kronosnet.org])
AC_USE_SYSTEM_EXTENSIONS
AM_INIT_AUTOMAKE([1.11.1 dist-bzip2 dist-xz color-tests -Wno-portability subdir-objects])
LT_PREREQ([2.2.6])
LT_INIT
AC_CONFIG_MACRO_DIR([m4])
AC_CONFIG_SRCDIR([kronosnetd/main.c])
AC_CONFIG_HEADERS([config.h])
AC_CANONICAL_HOST
AC_PROG_LIBTOOL
AC_LANG([C])
systemddir=${prefix}/lib/systemd/system
if test "$prefix" = "NONE"; then
prefix="/usr"
if test "$localstatedir" = "\${prefix}/var"; then
localstatedir="/var"
fi
if test "$sysconfdir" = "\${prefix}/etc"; then
sysconfdir="/etc"
fi
if test "$systemddir" = "NONE/lib/systemd/system"; then
systemddir=/lib/systemd/system
fi
if test "$libdir" = "\${exec_prefix}/lib"; then
if test -e /usr/lib64; then
libdir="/usr/lib64"
else
libdir="/usr/lib"
fi
fi
fi
# Checks for programs.
if ! ${MAKE-make} --version /cannot/make/this >/dev/null 2>&1; then
AC_MSG_ERROR(["you don't seem to have GNU make; it is required"])
fi
AC_PROG_AWK
AC_PROG_GREP
AC_PROG_SED
AC_PROG_CPP
AC_PROG_CC
AM_PROG_CC_C_O
AC_PROG_LN_S
AC_PROG_INSTALL
AC_PROG_MAKE_SET
AC_PROG_CXX
AC_PROG_RANLIB
AC_CHECK_PROGS([PUBLICAN], [publican], [:])
AC_CHECK_PROGS([PKGCONFIG], [pkg-config])
AC_ARG_ENABLE([poc],
[ --enable-poc : build poc code ],,
[ enable_poc="yes" ])
AM_CONDITIONAL([BUILD_POC], test x$enable_poc = xyes)
AC_ARG_ENABLE([kronosnetd],
[ --enable-kronosnetd : Kronosnetd support ],,
[ enable_kronosnetd="no" ])
AM_CONDITIONAL([BUILD_KRONOSNETD], test x$enable_kronosnetd = xyes)
AC_ARG_ENABLE([libtap],
[ --enable-libtap : libtap support ],,
[ enable_libtap="no" ])
if test "x$enable_kronosnetd" = xyes; then
enable_libtap=yes
fi
AM_CONDITIONAL([BUILD_LIBTAP], test x$enable_libtap = xyes)
AC_ARG_ENABLE([libknet-sctp],
[ --enable-libknet-sctp : libknet SCTP support ],,
[ enable_libknet_sctp="yes" ])
## local helper functions
# this function checks if CC support options passed as
# args. Global CFLAGS are ignored during this test.
cc_supports_flag() {
saveCPPFLAGS="$CPPFLAGS"
CPPFLAGS="$@"
if echo $CC | grep -q clang; then
CPPFLAGS="-Werror $CPPFLAGS"
fi
AC_MSG_CHECKING([whether $CC supports "$@"])
AC_PREPROC_IFELSE([AC_LANG_PROGRAM([])],
[RC=0; AC_MSG_RESULT([yes])],
[RC=1; AC_MSG_RESULT([no])])
CPPFLAGS="$saveCPPFLAGS"
return $RC
}
# helper macro to check libs without adding them to LIBS
check_lib_no_libs() {
lib_no_libs_arg1=$1
shift
lib_no_libs_arg2=$1
shift
lib_no_libs_args=$@
AC_CHECK_LIB([$lib_no_libs_arg1],
[$lib_no_libs_arg2],,,
[$lib_no_libs_args])
LIBS=$ac_check_lib_save_LIBS
}
# Checks for C features
AC_C_INLINE
# Checks for libraries.
AC_CHECK_LIB([pthread], [pthread_create])
AC_CHECK_LIB([m], [ceil])
AC_CHECK_LIB([rt], [clock_gettime])
PKG_CHECK_MODULES([nss],[nss])
# Checks for header files.
AC_CHECK_HEADERS([fcntl.h])
AC_CHECK_HEADERS([stdlib.h])
AC_CHECK_HEADERS([string.h])
AC_CHECK_HEADERS([strings.h])
AC_CHECK_HEADERS([sys/ioctl.h])
AC_CHECK_HEADERS([syslog.h])
AC_CHECK_HEADERS([unistd.h])
AC_CHECK_HEADERS([netinet/in.h])
AC_CHECK_HEADERS([sys/socket.h])
AC_CHECK_HEADERS([arpa/inet.h])
AC_CHECK_HEADERS([netdb.h])
AC_CHECK_HEADERS([limits.h])
AC_CHECK_HEADERS([stdint.h])
AC_CHECK_HEADERS([sys/epoll.h])
if test "x$enable_libknet_sctp" = xyes; then
AC_CHECK_HEADERS([netinet/sctp.h],, AC_MSG_ERROR(["missing required SCTP headers"]))
fi
# Checks for typedefs, structures, and compiler characteristics.
AC_C_INLINE
AC_TYPE_SIZE_T
AC_TYPE_PID_T
AC_TYPE_SSIZE_T
AC_TYPE_UINT8_T
AC_TYPE_UINT16_T
AC_TYPE_UINT32_T
AC_TYPE_UINT64_T
AC_TYPE_INT32_T
# Checks for library functions.
AC_FUNC_ALLOCA
AC_FUNC_FORK
AC_FUNC_MALLOC
AC_FUNC_REALLOC
AC_CHECK_FUNCS([memset])
AC_CHECK_FUNCS([strdup])
AC_CHECK_FUNCS([strerror])
AC_CHECK_FUNCS([dup2])
AC_CHECK_FUNCS([select])
AC_CHECK_FUNCS([socket])
AC_CHECK_FUNCS([inet_ntoa])
AC_CHECK_FUNCS([memmove])
AC_CHECK_FUNCS([strchr])
AC_CHECK_FUNCS([atexit])
AC_CHECK_FUNCS([ftruncate])
AC_CHECK_FUNCS([strrchr])
AC_CHECK_FUNCS([strstr])
AC_CHECK_FUNCS([clock_gettime])
AC_CHECK_FUNCS([strcasecmp])
-AC_CHECK_FUNCS([recvmmsg])
AC_CHECK_FUNCS([kevent])
# if neither sys/epoll.h nor kevent are present, we should fail.
if test "x$ac_cv_header_sys_epoll_h" = xno && test "x$ac_cv_func_kevent" = xno; then
AC_MSG_ERROR([Both epoll and kevent unavailable on this OS])
fi
if test "x$ac_cv_header_sys_epoll_h" = xyes && test "x$ac_cv_func_kevent" = xyes; then
AC_MSG_ERROR([Both epoll and kevent available on this OS, please contact the maintainers to fix the code])
fi
# checks (for kronosnetd)
if test "x$enable_kronosnetd" = xyes; then
AC_CHECK_HEADERS([security/pam_appl.h],
[AC_CHECK_LIB([pam], [pam_start])],
[AC_MSG_ERROR([Unable to find LinuxPAM devel files])])
AC_CHECK_HEADERS([security/pam_misc.h],
[AC_CHECK_LIB([pam_misc], [misc_conv])],
[AC_MSG_ERROR([Unable to find LinuxPAM MISC devel files])])
PKG_CHECK_MODULES([libqb], [libqb])
AC_CHECK_LIB([qb], [qb_log_thread_priority_set],
[have_qb_log_thread_priority_set="yes"],
[have_qb_log_thread_priority_set="no"])
if test "x${have_qb_log_thread_priority_set}" = xyes; then
AC_DEFINE_UNQUOTED([HAVE_QB_LOG_THREAD_PRIORITY_SET], 1, [have qb_log_thread_priority_set])
fi
fi
# local options
AC_ARG_ENABLE([debug],
[ --enable-debug enable debug build. ],
[ default="no" ])
AC_ARG_ENABLE([publicandocs],
[ --enable-publicandocs enable docs build. ],
[ default="no" ])
AC_ARG_WITH([initdefaultdir],
[ --with-initdefaultdir : path to /etc/sysconfig/.. or /etc/default dir. ],
[ INITDEFAULTDIR="$withval" ],
[ INITDEFAULTDIR="$sysconfdir/default" ])
AC_ARG_WITH([initddir],
[ --with-initddir=DIR : path to init script directory. ],
[ INITDDIR="$withval" ],
[ INITDDIR="$sysconfdir/init.d" ])
AC_ARG_WITH([systemddir],
[ --with-systemddir=DIR : path to systemd unit files directory. ],
[ SYSTEMDDIR="$withval" ],
[ SYSTEMDDIR="$systemddir" ])
AC_ARG_WITH([syslogfacility],
[ --with-syslogfacility=FACILITY
default syslog facility. ],
[ SYSLOGFACILITY="$withval" ],
[ SYSLOGFACILITY="LOG_DAEMON" ])
AC_ARG_WITH([sysloglevel],
[ --with-sysloglevel=LEVEL
default syslog level. ],
[ SYSLOGLEVEL="$withval" ],
[ SYSLOGLEVEL="LOG_INFO" ])
AC_ARG_WITH([defaultadmgroup],
[ --with-defaultadmgroup=GROUP
define PAM group. Users part of this group will be
allowed to configure kronosnet. Others will only
receive read-only rights. ],
[ DEFAULTADMGROUP="$withval" ],
[ DEFAULTADMGROUP="kronosnetadm" ])
## random vars
LOGDIR=${localstatedir}/log/
RUNDIR=${localstatedir}/run/
DEFAULT_CONFIG_DIR=${sysconfdir}/kronosnet
## do subst
AM_CONDITIONAL([BUILD_DOCS], [test "x${enable_publicandocs}" = xyes])
AM_CONDITIONAL([DEBUG], [test "x${enable_debug}" = xyes])
AC_SUBST([DEFAULT_CONFIG_DIR])
AC_SUBST([INITDEFAULTDIR])
AC_SUBST([INITDDIR])
AC_SUBST([SYSTEMDDIR])
AC_SUBST([LOGDIR])
AC_SUBST([DEFAULTADMGROUP])
AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_DIR],
["$(eval echo ${DEFAULT_CONFIG_DIR})"],
[Default config directory])
AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_FILE],
["$(eval echo ${DEFAULT_CONFIG_DIR}/kronosnetd.conf)"],
[Default config file])
AC_DEFINE_UNQUOTED([LOGDIR],
["$(eval echo ${LOGDIR})"],
[Default logging directory])
AC_DEFINE_UNQUOTED([DEFAULT_LOG_FILE],
["$(eval echo ${LOGDIR}/kronosnetd.log)"],
[Default log file])
AC_DEFINE_UNQUOTED([RUNDIR],
["$(eval echo ${RUNDIR})"],
[Default run directory])
AC_DEFINE_UNQUOTED([SYSLOGFACILITY],
[$(eval echo ${SYSLOGFACILITY})],
[Default syslog facility])
AC_DEFINE_UNQUOTED([SYSLOGLEVEL],
[$(eval echo ${SYSLOGLEVEL})],
[Default syslog level])
AC_DEFINE_UNQUOTED([DEFAULTADMGROUP],
["$(eval echo ${DEFAULTADMGROUP})"],
[Default admin group])
## *FLAGS handling
ENV_CFLAGS="$CFLAGS"
ENV_CPPFLAGS="$CPPFLAGS"
ENV_LDFLAGS="$LDFLAGS"
# debug build stuff
if test "x${enable_debug}" = xyes; then
AC_DEFINE_UNQUOTED([DEBUG], [1], [Compiling Debugging code])
OPT_CFLAGS="-O0"
else
OPT_CFLAGS="-O3"
fi
# gdb flags
if test "x${GCC}" = xyes; then
GDB_FLAGS="-ggdb3"
else
GDB_FLAGS="-g"
fi
# extra warnings
EXTRA_WARNINGS=""
WARNLIST="
all
shadow
missing-prototypes
missing-declarations
strict-prototypes
declaration-after-statement
pointer-arith
write-strings
cast-align
bad-function-cast
missing-format-attribute
format=2
format-security
format-nonliteral
no-long-long
unsigned-char
gnu89-inline
no-strict-aliasing
error
address
cpp
overflow
parentheses
sequence-point
switch
uninitialized
unused-but-set-variable
unused-function
unused-result
unused-value
unused-variable
"
for j in $WARNLIST; do
if cc_supports_flag -W$j; then
EXTRA_WARNINGS="$EXTRA_WARNINGS -W$j";
fi
done
CFLAGS="$ENV_CFLAGS $lt_prog_compiler_pic $OPT_CFLAGS $GDB_FLAGS \
$EXTRA_WARNINGS $WERROR_CFLAGS"
CPPFLAGS="$ENV_CPPFLAGS"
LDFLAGS="$ENV_LDFLAGS $lt_prog_compiler_pic -Wl,--as-needed"
AC_CONFIG_FILES([
Makefile
init/Makefile
libtap/Makefile
libtap/libtap.pc
kronosnetd/Makefile
kronosnetd/kronosnetd.logrotate
libknet/Makefile
libknet/libknet.pc
libknet/tests/Makefile
docs/Makefile
poc-code/Makefile
poc-code/iov-hash/Makefile
poc-code/access-list/Makefile
])
AC_OUTPUT
diff --git a/libknet/compat.c b/libknet/compat.c
index a7b5c244..479db2e7 100644
--- a/libknet/compat.c
+++ b/libknet/compat.c
@@ -1,159 +1,114 @@
/*
* Copyright (C) 2016 Red Hat, Inc. All rights reserved.
*
* Author: Jan Friesse <jfriesse@redhat.com>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <errno.h>
#include <sys/syscall.h>
#include "compat.h"
-#ifndef HAVE_RECVMMSG
-extern int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
- unsigned int flags, struct timespec *timeout)
-{
-#ifdef SYS_recvmmsg
- /*
- * For systems where kernel supports recvmmsg but glibc doesn't (RHEL 6)
- */
- return (syscall(SYS_recvmmsg, sockfd, msgvec, vlen, flags, timeout));
-#else
- /*
- * Generic implementation of recvmmsg using recvmsg
- */
- unsigned int i;
- ssize_t ret;
-
- if (vlen == 0) {
- return (0);
- }
-
- if ((timeout != NULL) || (flags & MSG_WAITFORONE)) {
- /*
- * Not implemented
- */
- errno = EINVAL;
- return (-1);
- }
-
- for (i = 0; i < vlen; i++) {
- ret = recvmsg(sockfd, &msgvec[i].msg_hdr, flags);
- if (ret >= 0) {
- msgvec[i].msg_len = ret;
- } else {
- if (ret == -1 && errno == EAGAIN) {
- ret = 0;
- }
- break ;
- }
- }
-
- return ((ret >= 0) ? i : ret);
-#endif
-}
-#endif
-
#ifndef HAVE_SYS_EPOLL_H
#ifdef HAVE_KEVENT
/* for FreeBSD which has kevent instead of epoll */
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <sys/errno.h>
static int32_t
_poll_to_filter_(int32_t event)
{
int32_t out = 0;
if (event & POLLIN)
out |= EVFILT_READ;
if (event & POLLOUT)
out |= EVFILT_WRITE;
return out;
}
int epoll_create(int size)
{
return kqueue();
}
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
{
int ret = 0;
struct kevent ke;
short filters = _poll_to_filter_(event->events);
switch (op) {
/* The kevent man page says that EV_ADD also does MOD */
case EPOLL_CTL_ADD:
case EPOLL_CTL_MOD:
EV_SET(&ke, fd, filters, EV_ADD | EV_ENABLE, 0, 0, event->data.ptr);
break;
case EPOLL_CTL_DEL:
EV_SET(&ke, fd, filters, EV_DELETE, 0, 0, event->data.ptr);
break;
default:
errno = EINVAL;
return -1;
}
ret = kevent(epfd, &ke, 1, NULL, 0, NULL);
return ret;
}
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout_ms)
{
struct kevent kevents[maxevents];
struct timespec timeout = { 0, 0 };
struct timespec *timeout_ptr = &timeout;
uint32_t revents;
int event_count;
int i;
int returned_events;
if (timeout_ms != -1) {
timeout.tv_sec = timeout_ms/1000;
timeout.tv_nsec += (timeout_ms % 1000) * 1000000ULL;
}
else {
timeout_ptr = NULL;
}
event_count = kevent(epfd, NULL, 0, kevents, maxevents, timeout_ptr);
if (event_count == -1) {
return -1;
}
returned_events = 0;
for (i = 0; i < event_count; i++) {
revents = 0;
if (kevents[i].flags & EV_ERROR) {
revents |= POLLERR;
}
if (kevents[i].flags & EV_EOF) {
revents |= POLLHUP;
}
if (kevents[i].filter == EVFILT_READ) {
revents |= POLLIN;
}
if (kevents[i].filter == EVFILT_WRITE) {
revents |= POLLOUT;
}
events[returned_events].events = revents;
events[returned_events].data.ptr = kevents[i].udata;
returned_events++;
}
return returned_events;
}
#endif /* HAVE_KEVENT */
#endif /* HAVE_SYS_EPOLL_H */
diff --git a/libknet/compat.h b/libknet/compat.h
index 48297869..6f6cccfc 100644
--- a/libknet/compat.h
+++ b/libknet/compat.h
@@ -1,66 +1,50 @@
/*
* Copyright (C) 2016 Red Hat, Inc. All rights reserved.
*
* Authors: Jan Friesse <jfriesse@redhat.com>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#ifndef __COMPAT_H__
#define __COMPAT_H__
#include "config.h"
#include <sys/socket.h>
#include <stdint.h>
-
-/* FreeBSD has recvmmsg but it's a buggy wrapper */
-#ifdef __FreeBSD__
-#define recvmmsg COMPAT_recvmmsg
-#undef HAVE_RECVMMSG
-#endif
-
-#ifndef MSG_WAITFORONE
-#define MSG_WAITFORONE 0x10000
-#endif
-
-#ifndef HAVE_RECVMMSG
-extern int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
- unsigned int flags, struct timespec *timeout);
-#endif
-
#ifndef ETIME
#define ETIME ETIMEDOUT
#endif
#ifdef HAVE_SYS_EPOLL_H
#include <sys/epoll.h>
#else
#ifdef HAVE_KEVENT
#include <poll.h>
#define EPOLL_CTL_ADD 1
#define EPOLL_CTL_MOD 2
#define EPOLL_CTL_DEL 3
#define EPOLLIN POLLIN
#define EPOLLOUT POLLOUT
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout_ms);
#endif /* HAVE_KEVENT */
#endif /* HAVE_SYS_EPOLL_H */
#endif /* __COMPAT_H__ */
diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
index d85bd429..7dce1e2e 100644
--- a/libknet/threads_rx.c
+++ b/libknet/threads_rx.c
@@ -1,727 +1,727 @@
/*
* Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include "compat.h"
#include "crypto.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "transports.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_rx.h"
#include "netutils.h"
/*
* RECV
*/
/*
* return 1 if a > b
* return -1 if b > a
* return 0 if they are equal
*/
static inline int timecmp(struct timespec a, struct timespec b)
{
if (a.tv_sec != b.tv_sec) {
if (a.tv_sec > b.tv_sec) {
return 1;
} else {
return -1;
}
} else {
if (a.tv_nsec > b.tv_nsec) {
return 1;
} else if (a.tv_nsec < b.tv_nsec) {
return -1;
} else {
return 0;
}
}
}
/*
* this functions needs to return an index (0 to 7)
* to a knet_host_defrag_buf. (-1 on errors)
*/
static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
{
struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
int i, oldest;
/*
* check if there is a buffer already in use handling the same seq_num
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (src_host->defrag_buf[i].in_use) {
if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
return i;
}
}
}
/*
* If there is no buffer that's handling the current seq_num
* either it's new or it's been reclaimed already.
* check if it's been reclaimed/seen before using the defrag circular
* buffer. If the pckt has been seen before, the buffer expired (ETIME)
* and there is no point to try to defrag it again.
*/
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
errno = ETIME;
return -1;
}
/*
* register the pckt as seen
*/
_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
/*
* see if there is a free buffer
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (!src_host->defrag_buf[i].in_use) {
return i;
}
}
/*
* at this point, there are no free buffers, the pckt is new
* and we need to reclaim a buffer, and we will take the one
* with the oldest timestamp. It's as good as any.
*/
oldest = 0;
for (i = 0; i < KNET_MAX_LINK; i++) {
if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
oldest = i;
}
}
src_host->defrag_buf[oldest].in_use = 0;
return oldest;
}
static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
{
struct knet_host_defrag_buf *defrag_buf;
int defrag_buf_idx;
defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
if (defrag_buf_idx < 0) {
if (errno == ETIME) {
log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired");
}
return 1;
}
defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
/*
* if the buf is not is use, then make sure it's clean
*/
if (!defrag_buf->in_use) {
memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
defrag_buf->in_use = 1;
defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
}
/*
* update timestamp on the buffer
*/
clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
/*
* check if we already received this fragment
*/
if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
/*
* if we have received this fragment and we didn't clear the buffer
* it means that we don't have all fragments yet
*/
return 1;
}
/*
* we need to handle the last packet with gloves due to its different size
*/
if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
defrag_buf->last_frag_size = *len;
/*
* in the event when the last packet arrives first,
* we still don't know the offset vs the other fragments (based on MTU),
* so we store the fragment at the end of the buffer where it's safe
* and take a copy of the len so that we can restore its offset later.
* remember we can't use the local MTU for this calculation because pMTU
* can be asymettric between the same hosts.
*/
if (!defrag_buf->frag_size) {
defrag_buf->last_first = 1;
memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
inbuf->khp_data_userdata,
*len);
}
} else {
defrag_buf->frag_size = *len;
}
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
inbuf->khp_data_userdata, *len);
defrag_buf->frag_recv++;
defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
/*
* check if we received all the fragments
*/
if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
/*
* special case the last pckt
*/
if (defrag_buf->last_first) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
defrag_buf->last_frag_size);
}
/*
* recalculate packet lenght
*/
*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
/*
* copy the pckt back in the user data
*/
memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
/*
* free this buffer
*/
defrag_buf->in_use = 0;
return 0;
}
return 1;
}
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
{
int err = 0, savederrno = 0;
ssize_t outlen;
struct knet_host *src_host;
struct knet_link *src_link;
unsigned long long latency_last;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
struct timespec recvtime;
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[1];
int8_t channel;
struct sockaddr_storage pckt_src;
seq_num_t recv_seq_num;
int wipe_bufs = 0;
if (knet_h->crypto_instance) {
if (crypto_authenticate_and_decrypt(knet_h,
(unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_decrypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
return;
}
len = outlen;
inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
}
if (len < (KNET_HEADER_SIZE + 1)) {
log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", len);
return;
}
if (inbuf->kh_version != KNET_HEADER_VERSION) {
log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
return;
}
inbuf->kh_node = ntohs(inbuf->kh_node);
src_host = knet_h->host_index[inbuf->kh_node];
if (src_host == NULL) { /* host not found */
log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
return;
}
src_link = NULL;
if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
src_link = src_host->link +
(inbuf->khp_ping_link % KNET_MAX_LINK);
if (src_link->dynamic == KNET_LINK_DYNIP) {
/*
* cpyaddrport will only copy address and port of the incoming
* packet and strip extra bits such as flow and scopeid
*/
cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
&pckt_src, sockaddr_len(&pckt_src)) != 0) {
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
src_host->host_id, src_link->link_id);
memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
} else {
log_info(knet_h, KNET_SUB_RX,
"host: %u link: %u new connection established from: %s %s",
src_host->host_id, src_link->link_id,
src_link->status.dst_ipaddr, src_link->status.dst_port);
}
}
/*
* transport has already accepted the connection here
* otherwise we would not be receiving packets
*/
knet_h->transport_ops[src_link->transport_type]->transport_link_dyn_connect(knet_h, sockfd, src_link);
}
}
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_HOST_INFO:
case KNET_HEADER_TYPE_DATA:
/*
* TODO: should we accept data even if we can't reply to the other node?
* how would that work with SCTP and guaranteed delivery?
*/
if (!src_host->status.reachable) {
log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id);
//return;
}
inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
channel = inbuf->khp_data_channel;
src_host->got_data = 1;
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
}
return;
}
if (inbuf->khp_data_frag_num > 1) {
/*
* len as received from the socket also includes extra stuff
* that the defrag code doesn't care about. So strip it
* here and readd only for repadding once we are done
* defragging
*/
len = len - KNET_HEADER_DATA_SIZE;
if (pckt_defrag(knet_h, inbuf, &len)) {
return;
}
len = len + KNET_HEADER_DATA_SIZE;
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (knet_h->enabled != 1) /* data forward is disabled */
break;
if (knet_h->dst_host_filter_fn) {
int host_idx;
int found = 0;
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
&channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
return;
}
if ((!bcast) && (!dst_host_ids_entries)) {
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return;
}
/* check if we are dst for this packet */
if (!bcast) {
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
if (!found) {
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return;
}
}
}
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (!knet_h->sockfd[channel].in_use) {
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
memset(iov_out, 0, sizeof(iov_out));
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata;
iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE;
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
return;
}
if (outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
}
} else { /* HOSTINFO */
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
bcast = 0;
knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
}
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
return;
}
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
switch(knet_hostinfo->khi_type) {
case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
break;
case KNET_HOSTINFO_TYPE_LINK_TABLE:
break;
default:
log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
break;
}
}
break;
case KNET_HEADER_TYPE_PING:
outlen = KNET_HEADER_PING_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PONG;
inbuf->kh_node = htons(knet_h->host_id);
recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
wipe_bufs = 0;
if (!inbuf->khp_ping_timed) {
/*
* we might be receiving this message from all links, but we want
* to process it only the first time
*/
if (recv_seq_num != src_host->untimed_rx_seq_num) {
/*
* cache the untimed seq num
*/
src_host->untimed_rx_seq_num = recv_seq_num;
/*
* if the host has received data in between
* untimed ping, then we don't need to wipe the bufs
*/
if (src_host->got_data) {
src_host->got_data = 0;
wipe_bufs = 0;
} else {
wipe_bufs = 1;
}
}
_seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
} else {
/*
* pings always arrives in bursts over all the link
* catch the first of them to cache the seq num and
* avoid duplicate processing
*/
if (recv_seq_num != src_host->timed_rx_seq_num) {
src_host->timed_rx_seq_num = recv_seq_num;
if (recv_seq_num == 0) {
_seq_num_lookup(src_host, recv_seq_num, 0, 1);
}
}
}
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
}
retry_pong:
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr,
sizeof(struct sockaddr_storage));
savederrno = errno;
if (len != outlen) {
err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
goto retry_pong;
break;
}
}
break;
case KNET_HEADER_TYPE_PONG:
clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
timespec_diff(recvtime,
src_link->status.pong_last, &latency_last);
src_link->status.latency =
((src_link->status.latency * src_link->latency_exp) +
((latency_last / 1000llu) *
(src_link->latency_fix - src_link->latency_exp))) /
src_link->latency_fix;
if (src_link->status.latency < src_link->pong_timeout) {
if (!src_link->status.connected) {
if (src_link->received_pong >= src_link->pong_count) {
log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
src_host->host_id, src_link->link_id);
_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
} else {
src_link->received_pong++;
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
src_host->host_id, src_link->link_id, src_link->received_pong);
}
}
}
break;
case KNET_HEADER_TYPE_PMTUD:
outlen = KNET_HEADER_PMTUD_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
inbuf->kh_node = htons(knet_h->host_id);
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
}
retry_pmtud:
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr,
sizeof(struct sockaddr_storage));
if (len != outlen) {
err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
goto retry_pmtud;
break;
}
}
break;
case KNET_HEADER_TYPE_PMTUD_REPLY:
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
break;
}
src_link->last_recv_mtu = inbuf->khp_pmtud_size;
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
break;
default:
return;
}
}
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int err, savederrno;
int i, msg_recv, transport;
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
/*
* this is normal if a fd got an event and before we grab the read lock
* and the link is removed by another thread
*/
goto exit_unlock;
}
transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
/*
* reset msg_namelen to buffer size because after recvmmsg
* each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
*/
for (i = 0; i < PCKT_FRAG_MAX; i++) {
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
}
- msg_recv = recvmmsg(sockfd, (struct mmsghdr *)&msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
+ msg_recv = _recvmmsg(sockfd, (struct mmsghdr *)&msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
/*
* WARNING: man page for recvmmsg is wrong. Kernel implementation here:
* recvmmsg can return:
* -1 on error
* 0 if the previous run of recvmmsg recorded an error on the socket
* N number of messages (see exception below).
*
* If there is an error from recvmsg after receiving a frame or more, the recvmmsg
* loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
* it will be visibile in the next run.
*
* Need to be careful how we handle errors at this stage.
*
* error messages need to be handled on a per transport/protocol base
* at this point we have different layers of error handling
* - msg_recv < 0 -> error from this run
* msg_recv = 0 -> error from previous run and error on socket needs to be cleared
* - per-transport message data
* example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
* but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
* - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
* errno set. That means the error api needs to be able to abort the loop below.
*/
if (msg_recv <= 0) {
knet_h->transport_ops[transport]->transport_rx_sock_error(knet_h, sockfd, msg_recv, savederrno);
goto exit_unlock;
}
for (i = 0; i < msg_recv; i++) {
err = knet_h->transport_ops[transport]->transport_rx_is_data(knet_h, sockfd, &msg[i]);
/*
* TODO: make this section silent once we are confident
* all protocols packet handlers are good
*/
switch(err) {
case -1: /* on error */
log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
goto exit_unlock;
break;
case 0: /* packet is not data and we should continue the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
break;
case 1: /* packet is not data and we should STOP the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
goto exit_unlock;
break;
case 2: /* packet is data and should be parsed as such */
_parse_recv_from_links(knet_h, sockfd, &msg[i]);
break;
}
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_FRAG_MAX];
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
struct iovec iov_in[PCKT_FRAG_MAX];
memset(&msg, 0, sizeof(msg));
for (i = 0; i < PCKT_FRAG_MAX; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
iov_in[i].iov_len = KNET_DATABUFSIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
}
}
return NULL;
}
diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c
index 23962c8d..6c2bca8d 100644
--- a/libknet/threads_tx.c
+++ b/libknet/threads_tx.c
@@ -1,609 +1,609 @@
/*
* Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <math.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include "compat.h"
#include "crypto.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "transports.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_tx.h"
#include "netutils.h"
/*
* SEND
*/
static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
{
int link_idx, msg_idx, sent_msgs, prev_sent, progress;
int err = 0, savederrno = 0;
struct knet_mmsghdr *cur;
for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
sent_msgs = 0;
prev_sent = 0;
progress = 1;
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr;
msg_idx++;
}
retry:
cur = &msg[prev_sent];
sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
&cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
switch(err) {
case -1: /* unrecoverable error */
goto out_unlock;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
goto retry;
break;
}
prev_sent = prev_sent + sent_msgs;
if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) {
if ((sent_msgs) || (progress)) {
if (sent_msgs) {
progress = 1;
} else {
progress = 0;
}
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
sent_msgs, msg_idx,
dst_host->name, dst_host->host_id,
dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
dst_host->link[dst_host->active_links[link_idx]].link_id);
#endif
goto retry;
}
if (!progress) {
savederrno = EAGAIN;
err = -1;
goto out_unlock;
}
}
if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
(dst_host->active_link_entries > 1)) {
uint8_t cur_link_id = dst_host->active_links[0];
memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
break;
}
}
out_unlock:
errno = savederrno;
return err;
}
static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync)
{
ssize_t outlen, frag_len;
struct knet_host *dst_host;
knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST];
size_t dst_host_ids_entries_temp = 0;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[PCKT_FRAG_MAX];
uint8_t frag_idx;
unsigned int temp_data_mtu;
int host_idx;
int send_mcast = 0;
struct knet_header *inbuf;
int savederrno = 0;
int err = 0;
seq_num_t tx_seq_num;
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
int msgs_to_send, msg_idx;
inbuf = knet_h->recv_from_sock_buf[buf_idx];
if ((knet_h->enabled != 1) &&
(inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
savederrno = ECANCELED;
err = -1;
goto out_unlock;
}
/*
* move this into a separate function to expand on
* extra switching rules
*/
switch(inbuf->kh_type) {
case KNET_HEADER_TYPE_DATA:
if (knet_h->dst_host_filter_fn) {
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
inlen,
KNET_NOTIFY_TX,
knet_h->host_id,
knet_h->host_id,
&channel,
dst_host_ids_temp,
&dst_host_ids_entries_temp);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
savederrno = EFAULT;
err = -1;
goto out_unlock;
}
if ((!bcast) && (!dst_host_ids_entries_temp)) {
log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
}
break;
case KNET_HEADER_TYPE_HOST_INFO:
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
bcast = 0;
dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
dst_host_ids_entries_temp = 1;
knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
}
break;
default:
log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
savederrno = ENOMSG;
err = -1;
goto out_unlock;
break;
}
if (is_sync) {
if ((bcast) ||
((!bcast) && (dst_host_ids_entries_temp > 1))) {
log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
savederrno = E2BIG;
err = -1;
goto out_unlock;
}
}
/*
* check destinations hosts before spending time
* in fragmenting/encrypting packets to save
* time processing data for unrechable hosts.
* for unicast, also remap the destination data
* to skip unreachable hosts.
*/
if (!bcast) {
dst_host_ids_entries = 0;
for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
if (!dst_host) {
continue;
}
if (dst_host->status.reachable) {
dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
dst_host_ids_entries++;
}
}
if (!dst_host_ids_entries) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
} else {
send_mcast = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (dst_host->status.reachable) {
send_mcast = 1;
break;
}
}
if (!send_mcast) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
}
if (!knet_h->data_mtu) {
/*
* using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
*/
log_debug(knet_h, KNET_SUB_TX,
"Received data packet but data MTU is still unknown."
" Packet might not be delivered."
" Assuming mininum IPv4 mtu (%d)",
KNET_PMTUD_MIN_MTU_V4);
temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
} else {
/*
* take a copy of the mtu to avoid value changing under
* our feet while we are sending a fragmented pckt
*/
temp_data_mtu = knet_h->data_mtu;
}
/*
* prepare the outgoing buffers
*/
frag_len = inlen;
frag_idx = 0;
inbuf->khp_data_bcast = bcast;
inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
inbuf->khp_data_channel = channel;
if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
goto out_unlock;
}
knet_h->tx_seq_num++;
/*
* force seq_num 0 to detect a node that has crashed and rejoining
* the knet instance. seq_num 0 will clear the buffers in the RX
* thread
*/
if (knet_h->tx_seq_num == 0) {
knet_h->tx_seq_num++;
}
/*
* cache the value in locked context
*/
tx_seq_num = knet_h->tx_seq_num;
inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num);
pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
/*
* forcefully broadcast a ping to all nodes every SEQ_MAX / 8
* pckts.
* this solves 2 problems:
* 1) on TX socket overloads we generate extra pings to keep links alive
* 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
* node 3+ will be able to keep in sync on the TX seq_num even without
* receiving traffic or pings in betweens. This avoids issues with
* rollover of the circular buffer
*/
if (tx_seq_num % (SEQ_MAX / 8) == 0) {
_send_pings(knet_h, 0);
}
if (inbuf->khp_data_frag_num > 1) {
while (frag_idx < inbuf->khp_data_frag_num) {
/*
* set the iov_base
*/
iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
/*
* set the len
*/
if (frag_len > temp_data_mtu) {
iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE;
} else {
iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
}
/*
* copy the frag info on all buffers
*/
knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata,
inbuf->khp_data_userdata + (temp_data_mtu * frag_idx),
iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE);
frag_len = frag_len - temp_data_mtu;
frag_idx++;
}
} else {
iov_out[frag_idx].iov_base = (void *)inbuf;
iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
}
if (knet_h->crypto_instance) {
frag_idx = 0;
while (frag_idx < inbuf->khp_data_frag_num) {
if (crypto_encrypt_and_sign(
knet_h,
(const unsigned char *)iov_out[frag_idx].iov_base,
iov_out[frag_idx].iov_len,
knet_h->send_to_links_buf_crypt[frag_idx],
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet");
savederrno = ECHILD;
err = -1;
goto out_unlock;
}
iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
iov_out[frag_idx].iov_len = outlen;
frag_idx++;
}
}
memset(&msg, 0, sizeof(msg));
msgs_to_send = inbuf->khp_data_frag_num;
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx];
msg[msg_idx].msg_hdr.msg_iovlen = 1;
msg_idx++;
}
if (!bcast) {
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids[host_idx]];
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
} else {
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (dst_host->status.reachable) {
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
}
}
out_unlock:
errno = savederrno;
return err;
}
int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out;
}
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
strerror(savederrno));
err = -1;
goto out;
}
knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA;
memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len);
err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1);
savederrno = errno;
pthread_mutex_unlock(&knet_h->tx_mutex);
out:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct knet_mmsghdr *msg, int type)
{
ssize_t inlen = 0;
struct iovec iov_in;
int msg_recv, i;
int savederrno = 0, docallback = 0;
if ((channel >= 0) &&
(channel < KNET_DATAFD_MAX) &&
(!knet_h->sockfd[channel].is_socket)) {
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata;
iov_in.iov_len = KNET_MAX_PACKET_SIZE;
inlen = readv(sockfd, &iov_in, 1);
if (inlen <= 0) {
savederrno = errno;
docallback = 1;
goto out;
}
msg_recv = 1;
knet_h->recv_from_sock_buf[0]->kh_type = type;
_parse_recv_from_sock(knet_h, 0, inlen, channel, 0);
} else {
- msg_recv = recvmmsg(sockfd, (struct mmsghdr *)&msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
+ msg_recv = _recvmmsg(sockfd, (struct mmsghdr *)&msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL);
if (msg_recv < 0) {
inlen = msg_recv;
savederrno = errno;
docallback = 1;
goto out;
}
for (i = 0; i < msg_recv; i++) {
inlen = msg[i].msg_len;
if (inlen == 0) {
savederrno = 0;
docallback = 1;
goto out;
break;
}
knet_h->recv_from_sock_buf[i]->kh_type = type;
_parse_recv_from_sock(knet_h, i, inlen, channel, 0);
}
}
out:
if (inlen < 0) {
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
} else {
knet_h->sockfd[channel].has_error = 1;
}
}
if (docallback) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_TX,
inlen,
savederrno);
}
}
void *_handle_send_to_links_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_FRAG_MAX];
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
struct iovec iov_in[PCKT_FRAG_MAX];
int i, nev, type;
int8_t channel;
memset(&msg, 0, sizeof(msg));
/* preparing data buffer */
for (i = 0; i < PCKT_FRAG_MAX; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata;
iov_in[i].iov_len = KNET_MAX_PACKET_SIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION;
knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0;
knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id);
knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1);
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
continue;
}
for (i = 0; i < nev; i++) {
if (events[i].data.fd == knet_h->hostsockfd[0]) {
type = KNET_HEADER_TYPE_HOST_INFO;
channel = -1;
} else {
type = KNET_HEADER_TYPE_DATA;
for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
if ((knet_h->sockfd[channel].in_use) &&
(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
break;
}
}
}
if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
continue;
}
_handle_send_to_links(knet_h, events[i].data.fd, channel, &msg[0], type);
pthread_mutex_unlock(&knet_h->tx_mutex);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
return NULL;
}
diff --git a/libknet/transport_common.c b/libknet/transport_common.c
index d56d4e0e..2b7187d7 100644
--- a/libknet/transport_common.c
+++ b/libknet/transport_common.c
@@ -1,428 +1,450 @@
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "libknet.h"
#include "compat.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "common.h"
#include "transports.h"
/*
* reuse Jan Friesse's compat layer as wrapper to drop usage of sendmmsg
*
* TODO: kill those wrappers once we work on packet delivery guaranteed
*/
+int _recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, unsigned int flags)
+{
+ int savederrno = 0, err = 0;
+ unsigned int i;
+
+ for (i = 0; i < vlen; i++) {
+ err = recvmsg(sockfd, &msgvec[i].msg_hdr, flags);
+ savederrno = errno;
+ if (err >= 0) {
+ msgvec[i].msg_len = err;
+ } else {
+ if (err == -1 && errno == EAGAIN) {
+ err = 0;
+ }
+ break;
+ }
+ }
+
+ errno = savederrno;
+ return ((err >= 0) ? i : err);
+}
+
int _sendmmsg(int sockfd, struct knet_mmsghdr *msgvec, unsigned int vlen, unsigned int flags)
{
int savederrno = 0, err = 0;
unsigned int i;
for (i = 0; i < vlen; i++) {
err = sendmsg(sockfd, &msgvec[i].msg_hdr, flags);
savederrno = errno;
if (err >= 0) {
msgvec[i].msg_len = err;
} else {
break;
}
}
errno = savederrno;
return ((err >= 0) ? vlen : err);
}
int _configure_common_socket(knet_handle_t knet_h, int sock, const char *type)
{
int err = 0, savederrno = 0;
int value;
if (_fdset_cloexec(sock)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s CLOEXEC socket opts: %s",
type, strerror(savederrno));
goto exit_error;
}
if (_fdset_nonblock(sock)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s NONBLOCK socket opts: %s",
type, strerror(savederrno));
goto exit_error;
}
value = KNET_RING_RCVBUFF;
#ifdef SO_RCVBUFFORCE
if (setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s receive buffer: %s",
type, strerror(savederrno));
goto exit_error;
}
#else
if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s SO_RECVBUF: %s",
type, strerror(savederrno));
goto exit_error;
}
#endif
value = KNET_RING_RCVBUFF;
#ifdef SO_SNDBUFFORCE
if (setsockopt(sock, SOL_SOCKET, SO_SNDBUFFORCE, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s send buffer: %s",
type, strerror(savederrno));
goto exit_error;
}
#else
if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s SO_SNDBUF: %s",
type, strerror(savederrno));
goto exit_error;
}
#endif
exit_error:
errno = savederrno;
return err;
}
int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type)
{
int err = 0, savederrno = 0;
int value;
if (_configure_common_socket(knet_h, sock, type) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
#ifdef IP_FREEBIND
value = 1;
if (setsockopt(sock, SOL_IP, IP_FREEBIND, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set FREEBIND on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
#endif
#ifdef IP_BINDANY /* BSD */
value = 1;
if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set BINDANY on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
#endif
if (address->ss_family == AF_INET6) {
value = 1;
if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY,
&value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s IPv6 only: %s",
type, strerror(savederrno));
goto exit_error;
}
#ifdef IPV6_MTU_DISCOVER
value = IPV6_PMTUDISC_PROBE;
if (setsockopt(sock, SOL_IPV6, IPV6_MTU_DISCOVER, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
#else
value = 1;
if (setsockopt(sock, IPPROTO_IPV6, IPV6_DONTFRAG, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set DONTFRAG on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
#endif
} else {
#ifdef IP_MTU_DISCOVER
value = IP_PMTUDISC_PROBE;
if (setsockopt(sock, SOL_IP, IP_MTU_DISCOVER, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
#else
value = 1;
if (setsockopt(sock, IPPROTO_IP, IP_DONTFRAG, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set DONTFRAG on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
#endif
}
value = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s reuseaddr: %s",
type, strerror(savederrno));
goto exit_error;
}
exit_error:
errno = savederrno;
return err;
}
int _init_socketpair(knet_handle_t knet_h, int *sock)
{
int err = 0, savederrno = 0;
int i;
if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sock) != 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
for (i = 0; i < 2; i++) {
if (_configure_common_socket(knet_h, sock[i], "local socketpair") < 0) {
savederrno = errno;
err = -1;
goto exit_fail;
}
}
exit_fail:
errno = savederrno;
return err;
}
void _close_socketpair(knet_handle_t knet_h, int *sock)
{
int i;
for (i = 0; i < 2; i++) {
if (sock[i]) {
close(sock[i]);
sock[i] = 0;
}
}
}
/*
* must be called with global read lock
*
* return -1 on error
* return 0 if fd is invalid
* return 1 if fd is valid
*/
int _is_valid_fd(knet_handle_t knet_h, int sockfd)
{
int ret = 0;
if (sockfd < 0) {
errno = EINVAL;
return -1;
}
if (sockfd > KNET_MAX_FDS) {
errno = EINVAL;
return -1;
}
if (knet_h->knet_transport_fd_tracker[sockfd].transport >= KNET_MAX_TRANSPORTS) {
ret = 0;
} else {
ret = 1;
}
return ret;
}
/*
* must be called with global write lock
*/
int _set_fd_tracker(knet_handle_t knet_h, int sockfd, uint8_t transport, uint8_t data_type, void *data)
{
if (sockfd < 0) {
errno = EINVAL;
return -1;
}
if (sockfd > KNET_MAX_FDS) {
errno = EINVAL;
return -1;
}
knet_h->knet_transport_fd_tracker[sockfd].transport = transport;
knet_h->knet_transport_fd_tracker[sockfd].data_type = data_type;
knet_h->knet_transport_fd_tracker[sockfd].data = data;
return 0;
}
/*
* public api
*/
int knet_handle_get_transport_list(knet_handle_t knet_h,
struct transport_info *transport_list, size_t *transport_list_entries)
{
int err = 0, savederrno = 0;
int i, count;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!transport_list) {
errno = EINVAL;
return -1;
}
if (!transport_list_entries) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
count = 0;
/*
* we could potentially build this struct
* at knet_handle_new init time, but
* let's keep it dynamic in case at somepoint
* we need to init transports dynamically
* at runtime vs init time.
*/
for (i=0; i<KNET_MAX_TRANSPORTS; i++) {
if (knet_h->transport_ops[i]) {
transport_list[count].name = knet_h->transport_ops[i]->transport_name;
transport_list[count].id = knet_h->transport_ops[i]->transport_id;
count++;
}
}
*transport_list_entries = count;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return err;
}
const char *knet_handle_get_transport_name_by_id(knet_handle_t knet_h, uint8_t transport)
{
int savederrno = 0;
const char *name = NULL;
if (!knet_h) {
errno = EINVAL;
return name;
}
if (transport >= KNET_MAX_TRANSPORTS) {
errno = EINVAL;
return name;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return name;
}
if (knet_h->transport_ops[transport]) {
name = knet_h->transport_ops[transport]->transport_name;
} else {
savederrno = ENOENT;
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return name;
}
uint8_t knet_handle_get_transport_id_by_name(knet_handle_t knet_h, const char *name)
{
int savederrno = 0;
uint8_t err = KNET_MAX_TRANSPORTS;
int i;
if (!knet_h) {
errno = EINVAL;
return err;
}
if (!name) {
errno = EINVAL;
return err;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return err;
}
for (i=0; i<KNET_MAX_TRANSPORTS; i++) {
if (knet_h->transport_ops[i]) {
if (!strcmp(knet_h->transport_ops[i]->transport_name, name)) {
err = knet_h->transport_ops[i]->transport_id;
break;
}
}
}
if (err == KNET_MAX_TRANSPORTS) {
savederrno = EINVAL;
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
diff --git a/libknet/transports.h b/libknet/transports.h
index e47f7128..e6f65f8f 100644
--- a/libknet/transports.h
+++ b/libknet/transports.h
@@ -1,26 +1,27 @@
/*
* Copyright (C) 2016 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#ifndef __TRANSPORTS_H__
#define __TRANSPORTS_H__
knet_transport_ops_t *get_udp_transport(void);
knet_transport_ops_t *get_sctp_transport(void);
int _configure_common_socket(knet_handle_t knet_h, int sock, const char *type);
int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type);
int _init_socketpair(knet_handle_t knet_h, int *sock);
void _close_socketpair(knet_handle_t knet_h, int *sock);
int _set_fd_tracker(knet_handle_t knet_h, int sockfd, uint8_t transport, uint8_t data_type, void *data);
int _is_valid_fd(knet_handle_t knet_h, int sockfd);
int _sendmmsg(int sockfd, struct knet_mmsghdr *msgvec, unsigned int vlen, unsigned int flags);
+int _recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, unsigned int flags);
#endif

File Metadata

Mime Type
text/x-diff
Expires
Wed, Feb 26, 12:28 PM (19 h, 43 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1465350
Default Alt Text
(65 KB)

Event Timeline