Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3155061
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
43 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/configure.ac b/configure.ac
index 74c00e53..af9b3b1d 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1,400 +1,399 @@
#
# Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
#
# Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
# Federico Simoncelli <fsimon@kronosnet.org>
#
# This software licensed under GPL-2.0+, LGPL-2.0+
#
# -*- Autoconf -*-
# Process this file with autoconf to produce a configure script.
#
AC_PREREQ([2.63])
AC_INIT([kronosnet],
m4_esyscmd([build-aux/git-version-gen .tarball-version]),
[devel@lists.kronosnet.org])
AC_USE_SYSTEM_EXTENSIONS
AM_INIT_AUTOMAKE([1.11.1 dist-bzip2 dist-xz color-tests -Wno-portability subdir-objects])
LT_PREREQ([2.2.6])
LT_INIT
AC_CONFIG_MACRO_DIR([m4])
AC_CONFIG_SRCDIR([kronosnetd/main.c])
AC_CONFIG_HEADERS([config.h])
AC_CANONICAL_HOST
AC_PROG_LIBTOOL
AC_LANG([C])
systemddir=${prefix}/lib/systemd/system
if test "$prefix" = "NONE"; then
prefix="/usr"
if test "$localstatedir" = "\${prefix}/var"; then
localstatedir="/var"
fi
if test "$sysconfdir" = "\${prefix}/etc"; then
sysconfdir="/etc"
fi
if test "$systemddir" = "NONE/lib/systemd/system"; then
systemddir=/lib/systemd/system
fi
if test "$libdir" = "\${exec_prefix}/lib"; then
if test -e /usr/lib64; then
libdir="/usr/lib64"
else
libdir="/usr/lib"
fi
fi
fi
# Checks for programs.
if ! ${MAKE-make} --version /cannot/make/this >/dev/null 2>&1; then
AC_MSG_ERROR(["you don't seem to have GNU make; it is required"])
fi
AC_PROG_AWK
AC_PROG_GREP
AC_PROG_SED
AC_PROG_CPP
AC_PROG_CC
AM_PROG_CC_C_O
AC_PROG_LN_S
AC_PROG_INSTALL
AC_PROG_MAKE_SET
AC_PROG_CXX
AC_PROG_RANLIB
AC_CHECK_PROGS([PUBLICAN], [publican], [:])
AC_CHECK_PROGS([PKGCONFIG], [pkg-config])
AC_ARG_ENABLE([poc],
[ --enable-poc : build poc code ],,
[ enable_poc="yes" ])
AM_CONDITIONAL([BUILD_POC], test x$enable_poc = xyes)
AC_ARG_ENABLE([kronosnetd],
[ --enable-kronosnetd : Kronosnetd support ],,
[ enable_kronosnetd="no" ])
AM_CONDITIONAL([BUILD_KRONOSNETD], test x$enable_kronosnetd = xyes)
AC_ARG_ENABLE([libtap],
[ --enable-libtap : libtap support ],,
[ enable_libtap="no" ])
if test "x$enable_kronosnetd" = xyes; then
enable_libtap=yes
fi
AM_CONDITIONAL([BUILD_LIBTAP], test x$enable_libtap = xyes)
AC_ARG_ENABLE([libknet-sctp],
[ --enable-libknet-sctp : libknet SCTP support ],,
[ enable_libknet_sctp="yes" ])
## local helper functions
# this function checks if CC support options passed as
# args. Global CFLAGS are ignored during this test.
cc_supports_flag() {
saveCPPFLAGS="$CPPFLAGS"
CPPFLAGS="$@"
if echo $CC | grep -q clang; then
CPPFLAGS="-Werror $CPPFLAGS"
fi
AC_MSG_CHECKING([whether $CC supports "$@"])
AC_PREPROC_IFELSE([AC_LANG_PROGRAM([])],
[RC=0; AC_MSG_RESULT([yes])],
[RC=1; AC_MSG_RESULT([no])])
CPPFLAGS="$saveCPPFLAGS"
return $RC
}
# helper macro to check libs without adding them to LIBS
check_lib_no_libs() {
lib_no_libs_arg1=$1
shift
lib_no_libs_arg2=$1
shift
lib_no_libs_args=$@
AC_CHECK_LIB([$lib_no_libs_arg1],
[$lib_no_libs_arg2],,,
[$lib_no_libs_args])
LIBS=$ac_check_lib_save_LIBS
}
# Checks for C features
AC_C_INLINE
# Checks for libraries.
AC_CHECK_LIB([pthread], [pthread_create])
AC_CHECK_LIB([m], [ceil])
AC_CHECK_LIB([rt], [clock_gettime])
PKG_CHECK_MODULES([nss],[nss])
# Checks for header files.
AC_CHECK_HEADERS([fcntl.h])
AC_CHECK_HEADERS([stdlib.h])
AC_CHECK_HEADERS([string.h])
AC_CHECK_HEADERS([strings.h])
AC_CHECK_HEADERS([sys/ioctl.h])
AC_CHECK_HEADERS([syslog.h])
AC_CHECK_HEADERS([unistd.h])
AC_CHECK_HEADERS([netinet/in.h])
AC_CHECK_HEADERS([sys/socket.h])
AC_CHECK_HEADERS([arpa/inet.h])
AC_CHECK_HEADERS([netdb.h])
AC_CHECK_HEADERS([limits.h])
AC_CHECK_HEADERS([stdint.h])
AC_CHECK_HEADERS([sys/epoll.h])
if test "x$enable_libknet_sctp" = xyes; then
AC_CHECK_HEADERS([netinet/sctp.h],, AC_MSG_ERROR(["missing required SCTP headers"]))
fi
# Checks for typedefs, structures, and compiler characteristics.
AC_C_INLINE
AC_TYPE_SIZE_T
AC_TYPE_PID_T
AC_TYPE_SSIZE_T
AC_TYPE_UINT8_T
AC_TYPE_UINT16_T
AC_TYPE_UINT32_T
AC_TYPE_UINT64_T
AC_TYPE_INT32_T
# Checks for library functions.
AC_FUNC_ALLOCA
AC_FUNC_FORK
AC_FUNC_MALLOC
AC_FUNC_REALLOC
AC_CHECK_FUNCS([memset])
AC_CHECK_FUNCS([strdup])
AC_CHECK_FUNCS([strerror])
AC_CHECK_FUNCS([dup2])
AC_CHECK_FUNCS([select])
AC_CHECK_FUNCS([socket])
AC_CHECK_FUNCS([inet_ntoa])
AC_CHECK_FUNCS([memmove])
AC_CHECK_FUNCS([strchr])
AC_CHECK_FUNCS([atexit])
AC_CHECK_FUNCS([ftruncate])
AC_CHECK_FUNCS([strrchr])
AC_CHECK_FUNCS([strstr])
AC_CHECK_FUNCS([clock_gettime])
AC_CHECK_FUNCS([strcasecmp])
-AC_CHECK_FUNCS([sendmmsg])
AC_CHECK_FUNCS([recvmmsg])
AC_CHECK_FUNCS([kevent])
# if neither sys/epoll.h nor kevent are present, we should fail.
if test "x$ac_cv_header_sys_epoll_h" = xno && test "x$ac_cv_func_kevent" = xno; then
AC_MSG_ERROR([Both epoll and kevent unavailable on this OS])
fi
if test "x$ac_cv_header_sys_epoll_h" = xyes && test "x$ac_cv_func_kevent" = xyes; then
AC_MSG_ERROR([Both epoll and kevent available on this OS, please contact the maintainers to fix the code])
fi
# checks (for kronosnetd)
if test "x$enable_kronosnetd" = xyes; then
AC_CHECK_HEADERS([security/pam_appl.h],
[AC_CHECK_LIB([pam], [pam_start])],
[AC_MSG_ERROR([Unable to find LinuxPAM devel files])])
AC_CHECK_HEADERS([security/pam_misc.h],
[AC_CHECK_LIB([pam_misc], [misc_conv])],
[AC_MSG_ERROR([Unable to find LinuxPAM MISC devel files])])
PKG_CHECK_MODULES([libqb], [libqb])
AC_CHECK_LIB([qb], [qb_log_thread_priority_set],
[have_qb_log_thread_priority_set="yes"],
[have_qb_log_thread_priority_set="no"])
if test "x${have_qb_log_thread_priority_set}" = xyes; then
AC_DEFINE_UNQUOTED([HAVE_QB_LOG_THREAD_PRIORITY_SET], 1, [have qb_log_thread_priority_set])
fi
fi
# local options
AC_ARG_ENABLE([debug],
[ --enable-debug enable debug build. ],
[ default="no" ])
AC_ARG_ENABLE([publicandocs],
[ --enable-publicandocs enable docs build. ],
[ default="no" ])
AC_ARG_WITH([initdefaultdir],
[ --with-initdefaultdir : path to /etc/sysconfig/.. or /etc/default dir. ],
[ INITDEFAULTDIR="$withval" ],
[ INITDEFAULTDIR="$sysconfdir/default" ])
AC_ARG_WITH([initddir],
[ --with-initddir=DIR : path to init script directory. ],
[ INITDDIR="$withval" ],
[ INITDDIR="$sysconfdir/init.d" ])
AC_ARG_WITH([systemddir],
[ --with-systemddir=DIR : path to systemd unit files directory. ],
[ SYSTEMDDIR="$withval" ],
[ SYSTEMDDIR="$systemddir" ])
AC_ARG_WITH([syslogfacility],
[ --with-syslogfacility=FACILITY
default syslog facility. ],
[ SYSLOGFACILITY="$withval" ],
[ SYSLOGFACILITY="LOG_DAEMON" ])
AC_ARG_WITH([sysloglevel],
[ --with-sysloglevel=LEVEL
default syslog level. ],
[ SYSLOGLEVEL="$withval" ],
[ SYSLOGLEVEL="LOG_INFO" ])
AC_ARG_WITH([defaultadmgroup],
[ --with-defaultadmgroup=GROUP
define PAM group. Users part of this group will be
allowed to configure kronosnet. Others will only
receive read-only rights. ],
[ DEFAULTADMGROUP="$withval" ],
[ DEFAULTADMGROUP="kronosnetadm" ])
## random vars
LOGDIR=${localstatedir}/log/
RUNDIR=${localstatedir}/run/
DEFAULT_CONFIG_DIR=${sysconfdir}/kronosnet
## do subst
AM_CONDITIONAL([BUILD_DOCS], [test "x${enable_publicandocs}" = xyes])
AM_CONDITIONAL([DEBUG], [test "x${enable_debug}" = xyes])
AC_SUBST([DEFAULT_CONFIG_DIR])
AC_SUBST([INITDEFAULTDIR])
AC_SUBST([INITDDIR])
AC_SUBST([SYSTEMDDIR])
AC_SUBST([LOGDIR])
AC_SUBST([DEFAULTADMGROUP])
AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_DIR],
["$(eval echo ${DEFAULT_CONFIG_DIR})"],
[Default config directory])
AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_FILE],
["$(eval echo ${DEFAULT_CONFIG_DIR}/kronosnetd.conf)"],
[Default config file])
AC_DEFINE_UNQUOTED([LOGDIR],
["$(eval echo ${LOGDIR})"],
[Default logging directory])
AC_DEFINE_UNQUOTED([DEFAULT_LOG_FILE],
["$(eval echo ${LOGDIR}/kronosnetd.log)"],
[Default log file])
AC_DEFINE_UNQUOTED([RUNDIR],
["$(eval echo ${RUNDIR})"],
[Default run directory])
AC_DEFINE_UNQUOTED([SYSLOGFACILITY],
[$(eval echo ${SYSLOGFACILITY})],
[Default syslog facility])
AC_DEFINE_UNQUOTED([SYSLOGLEVEL],
[$(eval echo ${SYSLOGLEVEL})],
[Default syslog level])
AC_DEFINE_UNQUOTED([DEFAULTADMGROUP],
["$(eval echo ${DEFAULTADMGROUP})"],
[Default admin group])
## *FLAGS handling
ENV_CFLAGS="$CFLAGS"
ENV_CPPFLAGS="$CPPFLAGS"
ENV_LDFLAGS="$LDFLAGS"
# debug build stuff
if test "x${enable_debug}" = xyes; then
AC_DEFINE_UNQUOTED([DEBUG], [1], [Compiling Debugging code])
OPT_CFLAGS="-O0"
else
OPT_CFLAGS="-O3"
fi
# gdb flags
if test "x${GCC}" = xyes; then
GDB_FLAGS="-ggdb3"
else
GDB_FLAGS="-g"
fi
# extra warnings
EXTRA_WARNINGS=""
WARNLIST="
all
shadow
missing-prototypes
missing-declarations
strict-prototypes
declaration-after-statement
pointer-arith
write-strings
cast-align
bad-function-cast
missing-format-attribute
format=2
format-security
format-nonliteral
no-long-long
unsigned-char
gnu89-inline
no-strict-aliasing
error
address
cpp
overflow
parentheses
sequence-point
switch
uninitialized
unused-but-set-variable
unused-function
unused-result
unused-value
unused-variable
"
for j in $WARNLIST; do
if cc_supports_flag -W$j; then
EXTRA_WARNINGS="$EXTRA_WARNINGS -W$j";
fi
done
CFLAGS="$ENV_CFLAGS $lt_prog_compiler_pic $OPT_CFLAGS $GDB_FLAGS \
$EXTRA_WARNINGS $WERROR_CFLAGS"
CPPFLAGS="$ENV_CPPFLAGS"
LDFLAGS="$ENV_LDFLAGS $lt_prog_compiler_pic -Wl,--as-needed"
AC_CONFIG_FILES([
Makefile
init/Makefile
libtap/Makefile
libtap/libtap.pc
kronosnetd/Makefile
kronosnetd/kronosnetd.logrotate
libknet/Makefile
libknet/libknet.pc
libknet/tests/Makefile
docs/Makefile
poc-code/Makefile
poc-code/iov-hash/Makefile
poc-code/access-list/Makefile
])
AC_OUTPUT
diff --git a/libknet/compat.c b/libknet/compat.c
index e4d0e096..a7b5c244 100644
--- a/libknet/compat.c
+++ b/libknet/compat.c
@@ -1,193 +1,159 @@
/*
* Copyright (C) 2016 Red Hat, Inc. All rights reserved.
*
* Author: Jan Friesse <jfriesse@redhat.com>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <errno.h>
#include <sys/syscall.h>
#include "compat.h"
-#ifndef HAVE_SENDMMSG
-int sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
- unsigned int flags)
-{
-#ifdef SYS_sendmmsg
- /*
- * For systems where kernel supports sendmmsg but glibc doesn't (RHEL 6)
- */
- return (syscall(SYS_sendmmsg, sockfd, msgvec, vlen, flags));
-#else
- /*
- * Generic implementation of sendmmsg using sendmsg
- */
- unsigned int i;
- ssize_t ret;
-
- if (vlen == 0) {
- return (0);
- }
-
- for (i = 0; i < vlen; i++) {
- ret = sendmsg(sockfd, &msgvec[i].msg_hdr, flags);
- if (ret >= 0) {
- msgvec[i].msg_len = ret;
- } else {
- break ;
- }
- }
-
- return ((ret >= 0) ? vlen : ret);
-#endif
-}
-#endif
-
#ifndef HAVE_RECVMMSG
extern int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
unsigned int flags, struct timespec *timeout)
{
#ifdef SYS_recvmmsg
/*
* For systems where kernel supports recvmmsg but glibc doesn't (RHEL 6)
*/
return (syscall(SYS_recvmmsg, sockfd, msgvec, vlen, flags, timeout));
#else
/*
* Generic implementation of recvmmsg using recvmsg
*/
unsigned int i;
ssize_t ret;
if (vlen == 0) {
return (0);
}
if ((timeout != NULL) || (flags & MSG_WAITFORONE)) {
/*
* Not implemented
*/
errno = EINVAL;
return (-1);
}
for (i = 0; i < vlen; i++) {
ret = recvmsg(sockfd, &msgvec[i].msg_hdr, flags);
if (ret >= 0) {
msgvec[i].msg_len = ret;
} else {
if (ret == -1 && errno == EAGAIN) {
ret = 0;
}
break ;
}
}
return ((ret >= 0) ? i : ret);
#endif
}
#endif
#ifndef HAVE_SYS_EPOLL_H
#ifdef HAVE_KEVENT
/* for FreeBSD which has kevent instead of epoll */
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <sys/errno.h>
static int32_t
_poll_to_filter_(int32_t event)
{
int32_t out = 0;
if (event & POLLIN)
out |= EVFILT_READ;
if (event & POLLOUT)
out |= EVFILT_WRITE;
return out;
}
int epoll_create(int size)
{
return kqueue();
}
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
{
int ret = 0;
struct kevent ke;
short filters = _poll_to_filter_(event->events);
switch (op) {
/* The kevent man page says that EV_ADD also does MOD */
case EPOLL_CTL_ADD:
case EPOLL_CTL_MOD:
EV_SET(&ke, fd, filters, EV_ADD | EV_ENABLE, 0, 0, event->data.ptr);
break;
case EPOLL_CTL_DEL:
EV_SET(&ke, fd, filters, EV_DELETE, 0, 0, event->data.ptr);
break;
default:
errno = EINVAL;
return -1;
}
ret = kevent(epfd, &ke, 1, NULL, 0, NULL);
return ret;
}
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout_ms)
{
struct kevent kevents[maxevents];
struct timespec timeout = { 0, 0 };
struct timespec *timeout_ptr = &timeout;
uint32_t revents;
int event_count;
int i;
int returned_events;
if (timeout_ms != -1) {
timeout.tv_sec = timeout_ms/1000;
timeout.tv_nsec += (timeout_ms % 1000) * 1000000ULL;
}
else {
timeout_ptr = NULL;
}
event_count = kevent(epfd, NULL, 0, kevents, maxevents, timeout_ptr);
if (event_count == -1) {
return -1;
}
returned_events = 0;
for (i = 0; i < event_count; i++) {
revents = 0;
if (kevents[i].flags & EV_ERROR) {
revents |= POLLERR;
}
if (kevents[i].flags & EV_EOF) {
revents |= POLLHUP;
}
if (kevents[i].filter == EVFILT_READ) {
revents |= POLLIN;
}
if (kevents[i].filter == EVFILT_WRITE) {
revents |= POLLOUT;
}
events[returned_events].events = revents;
events[returned_events].data.ptr = kevents[i].udata;
returned_events++;
}
return returned_events;
}
#endif /* HAVE_KEVENT */
#endif /* HAVE_SYS_EPOLL_H */
diff --git a/libknet/compat.h b/libknet/compat.h
index 05d6116b..48297869 100644
--- a/libknet/compat.h
+++ b/libknet/compat.h
@@ -1,73 +1,66 @@
/*
* Copyright (C) 2016 Red Hat, Inc. All rights reserved.
*
* Authors: Jan Friesse <jfriesse@redhat.com>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#ifndef __COMPAT_H__
#define __COMPAT_H__
#include "config.h"
#include <sys/socket.h>
#include <stdint.h>
/* FreeBSD has recvmmsg but it's a buggy wrapper */
#ifdef __FreeBSD__
#define recvmmsg COMPAT_recvmmsg
-#define sendmmsg COMPAT_sendmmsg
#undef HAVE_RECVMMSG
-#undef HAVE_SENDMMSG
#endif
#ifndef MSG_WAITFORONE
#define MSG_WAITFORONE 0x10000
#endif
-#ifndef HAVE_SENDMMSG
-extern int sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
- unsigned int flags);
-#endif
-
#ifndef HAVE_RECVMMSG
extern int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
unsigned int flags, struct timespec *timeout);
#endif
#ifndef ETIME
#define ETIME ETIMEDOUT
#endif
#ifdef HAVE_SYS_EPOLL_H
#include <sys/epoll.h>
#else
#ifdef HAVE_KEVENT
#include <poll.h>
#define EPOLL_CTL_ADD 1
#define EPOLL_CTL_MOD 2
#define EPOLL_CTL_DEL 3
#define EPOLLIN POLLIN
#define EPOLLOUT POLLOUT
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout_ms);
#endif /* HAVE_KEVENT */
#endif /* HAVE_SYS_EPOLL_H */
#endif /* __COMPAT_H__ */
diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c
index 4cad4374..23962c8d 100644
--- a/libknet/threads_tx.c
+++ b/libknet/threads_tx.c
@@ -1,609 +1,609 @@
/*
* Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <math.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include "compat.h"
#include "crypto.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "transports.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_tx.h"
#include "netutils.h"
/*
* SEND
*/
static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
{
int link_idx, msg_idx, sent_msgs, prev_sent, progress;
int err = 0, savederrno = 0;
struct knet_mmsghdr *cur;
for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
sent_msgs = 0;
prev_sent = 0;
progress = 1;
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr;
msg_idx++;
}
retry:
cur = &msg[prev_sent];
- sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
- (struct mmsghdr *)&cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
+ sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
+ &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
switch(err) {
case -1: /* unrecoverable error */
goto out_unlock;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
goto retry;
break;
}
prev_sent = prev_sent + sent_msgs;
if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) {
if ((sent_msgs) || (progress)) {
if (sent_msgs) {
progress = 1;
} else {
progress = 0;
}
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
sent_msgs, msg_idx,
dst_host->name, dst_host->host_id,
dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
dst_host->link[dst_host->active_links[link_idx]].link_id);
#endif
goto retry;
}
if (!progress) {
savederrno = EAGAIN;
err = -1;
goto out_unlock;
}
}
if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
(dst_host->active_link_entries > 1)) {
uint8_t cur_link_id = dst_host->active_links[0];
memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
break;
}
}
out_unlock:
errno = savederrno;
return err;
}
static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync)
{
ssize_t outlen, frag_len;
struct knet_host *dst_host;
knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST];
size_t dst_host_ids_entries_temp = 0;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[PCKT_FRAG_MAX];
uint8_t frag_idx;
unsigned int temp_data_mtu;
int host_idx;
int send_mcast = 0;
struct knet_header *inbuf;
int savederrno = 0;
int err = 0;
seq_num_t tx_seq_num;
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
int msgs_to_send, msg_idx;
inbuf = knet_h->recv_from_sock_buf[buf_idx];
if ((knet_h->enabled != 1) &&
(inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
savederrno = ECANCELED;
err = -1;
goto out_unlock;
}
/*
* move this into a separate function to expand on
* extra switching rules
*/
switch(inbuf->kh_type) {
case KNET_HEADER_TYPE_DATA:
if (knet_h->dst_host_filter_fn) {
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
inlen,
KNET_NOTIFY_TX,
knet_h->host_id,
knet_h->host_id,
&channel,
dst_host_ids_temp,
&dst_host_ids_entries_temp);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
savederrno = EFAULT;
err = -1;
goto out_unlock;
}
if ((!bcast) && (!dst_host_ids_entries_temp)) {
log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
}
break;
case KNET_HEADER_TYPE_HOST_INFO:
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
bcast = 0;
dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
dst_host_ids_entries_temp = 1;
knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
}
break;
default:
log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
savederrno = ENOMSG;
err = -1;
goto out_unlock;
break;
}
if (is_sync) {
if ((bcast) ||
((!bcast) && (dst_host_ids_entries_temp > 1))) {
log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
savederrno = E2BIG;
err = -1;
goto out_unlock;
}
}
/*
* check destinations hosts before spending time
* in fragmenting/encrypting packets to save
* time processing data for unrechable hosts.
* for unicast, also remap the destination data
* to skip unreachable hosts.
*/
if (!bcast) {
dst_host_ids_entries = 0;
for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
if (!dst_host) {
continue;
}
if (dst_host->status.reachable) {
dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
dst_host_ids_entries++;
}
}
if (!dst_host_ids_entries) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
} else {
send_mcast = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (dst_host->status.reachable) {
send_mcast = 1;
break;
}
}
if (!send_mcast) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
}
if (!knet_h->data_mtu) {
/*
* using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
*/
log_debug(knet_h, KNET_SUB_TX,
"Received data packet but data MTU is still unknown."
" Packet might not be delivered."
" Assuming mininum IPv4 mtu (%d)",
KNET_PMTUD_MIN_MTU_V4);
temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
} else {
/*
* take a copy of the mtu to avoid value changing under
* our feet while we are sending a fragmented pckt
*/
temp_data_mtu = knet_h->data_mtu;
}
/*
* prepare the outgoing buffers
*/
frag_len = inlen;
frag_idx = 0;
inbuf->khp_data_bcast = bcast;
inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
inbuf->khp_data_channel = channel;
if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
goto out_unlock;
}
knet_h->tx_seq_num++;
/*
* force seq_num 0 to detect a node that has crashed and rejoining
* the knet instance. seq_num 0 will clear the buffers in the RX
* thread
*/
if (knet_h->tx_seq_num == 0) {
knet_h->tx_seq_num++;
}
/*
* cache the value in locked context
*/
tx_seq_num = knet_h->tx_seq_num;
inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num);
pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
/*
* forcefully broadcast a ping to all nodes every SEQ_MAX / 8
* pckts.
* this solves 2 problems:
* 1) on TX socket overloads we generate extra pings to keep links alive
* 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
* node 3+ will be able to keep in sync on the TX seq_num even without
* receiving traffic or pings in betweens. This avoids issues with
* rollover of the circular buffer
*/
if (tx_seq_num % (SEQ_MAX / 8) == 0) {
_send_pings(knet_h, 0);
}
if (inbuf->khp_data_frag_num > 1) {
while (frag_idx < inbuf->khp_data_frag_num) {
/*
* set the iov_base
*/
iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
/*
* set the len
*/
if (frag_len > temp_data_mtu) {
iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE;
} else {
iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
}
/*
* copy the frag info on all buffers
*/
knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata,
inbuf->khp_data_userdata + (temp_data_mtu * frag_idx),
iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE);
frag_len = frag_len - temp_data_mtu;
frag_idx++;
}
} else {
iov_out[frag_idx].iov_base = (void *)inbuf;
iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
}
if (knet_h->crypto_instance) {
frag_idx = 0;
while (frag_idx < inbuf->khp_data_frag_num) {
if (crypto_encrypt_and_sign(
knet_h,
(const unsigned char *)iov_out[frag_idx].iov_base,
iov_out[frag_idx].iov_len,
knet_h->send_to_links_buf_crypt[frag_idx],
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet");
savederrno = ECHILD;
err = -1;
goto out_unlock;
}
iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
iov_out[frag_idx].iov_len = outlen;
frag_idx++;
}
}
memset(&msg, 0, sizeof(msg));
msgs_to_send = inbuf->khp_data_frag_num;
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx];
msg[msg_idx].msg_hdr.msg_iovlen = 1;
msg_idx++;
}
if (!bcast) {
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids[host_idx]];
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
} else {
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (dst_host->status.reachable) {
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
}
}
out_unlock:
errno = savederrno;
return err;
}
int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out;
}
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
strerror(savederrno));
err = -1;
goto out;
}
knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA;
memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len);
err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1);
savederrno = errno;
pthread_mutex_unlock(&knet_h->tx_mutex);
out:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct knet_mmsghdr *msg, int type)
{
ssize_t inlen = 0;
struct iovec iov_in;
int msg_recv, i;
int savederrno = 0, docallback = 0;
if ((channel >= 0) &&
(channel < KNET_DATAFD_MAX) &&
(!knet_h->sockfd[channel].is_socket)) {
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata;
iov_in.iov_len = KNET_MAX_PACKET_SIZE;
inlen = readv(sockfd, &iov_in, 1);
if (inlen <= 0) {
savederrno = errno;
docallback = 1;
goto out;
}
msg_recv = 1;
knet_h->recv_from_sock_buf[0]->kh_type = type;
_parse_recv_from_sock(knet_h, 0, inlen, channel, 0);
} else {
msg_recv = recvmmsg(sockfd, (struct mmsghdr *)&msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
if (msg_recv < 0) {
inlen = msg_recv;
savederrno = errno;
docallback = 1;
goto out;
}
for (i = 0; i < msg_recv; i++) {
inlen = msg[i].msg_len;
if (inlen == 0) {
savederrno = 0;
docallback = 1;
goto out;
break;
}
knet_h->recv_from_sock_buf[i]->kh_type = type;
_parse_recv_from_sock(knet_h, i, inlen, channel, 0);
}
}
out:
if (inlen < 0) {
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
} else {
knet_h->sockfd[channel].has_error = 1;
}
}
if (docallback) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_TX,
inlen,
savederrno);
}
}
void *_handle_send_to_links_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_FRAG_MAX];
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
struct iovec iov_in[PCKT_FRAG_MAX];
int i, nev, type;
int8_t channel;
memset(&msg, 0, sizeof(msg));
/* preparing data buffer */
for (i = 0; i < PCKT_FRAG_MAX; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata;
iov_in[i].iov_len = KNET_MAX_PACKET_SIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION;
knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0;
knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id);
knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1);
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
continue;
}
for (i = 0; i < nev; i++) {
if (events[i].data.fd == knet_h->hostsockfd[0]) {
type = KNET_HEADER_TYPE_HOST_INFO;
channel = -1;
} else {
type = KNET_HEADER_TYPE_DATA;
for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
if ((knet_h->sockfd[channel].in_use) &&
(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
break;
}
}
}
if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
continue;
}
_handle_send_to_links(knet_h, events[i].data.fd, channel, &msg[0], type);
pthread_mutex_unlock(&knet_h->tx_mutex);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
return NULL;
}
diff --git a/libknet/transport_common.c b/libknet/transport_common.c
index a9eb59e7..d56d4e0e 100644
--- a/libknet/transport_common.c
+++ b/libknet/transport_common.c
@@ -1,403 +1,428 @@
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "libknet.h"
#include "compat.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "common.h"
#include "transports.h"
+/*
+ * reuse Jan Friesse's compat layer as wrapper to drop usage of sendmmsg
+ *
+ * TODO: kill those wrappers once we work on packet delivery guaranteed
+ */
+
+int _sendmmsg(int sockfd, struct knet_mmsghdr *msgvec, unsigned int vlen, unsigned int flags)
+{
+ int savederrno = 0, err = 0;
+ unsigned int i;
+
+ for (i = 0; i < vlen; i++) {
+ err = sendmsg(sockfd, &msgvec[i].msg_hdr, flags);
+ savederrno = errno;
+ if (err >= 0) {
+ msgvec[i].msg_len = err;
+ } else {
+ break;
+ }
+ }
+
+ errno = savederrno;
+ return ((err >= 0) ? vlen : err);
+}
+
int _configure_common_socket(knet_handle_t knet_h, int sock, const char *type)
{
int err = 0, savederrno = 0;
int value;
if (_fdset_cloexec(sock)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s CLOEXEC socket opts: %s",
type, strerror(savederrno));
goto exit_error;
}
if (_fdset_nonblock(sock)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s NONBLOCK socket opts: %s",
type, strerror(savederrno));
goto exit_error;
}
value = KNET_RING_RCVBUFF;
#ifdef SO_RCVBUFFORCE
if (setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s receive buffer: %s",
type, strerror(savederrno));
goto exit_error;
}
#else
if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s SO_RECVBUF: %s",
type, strerror(savederrno));
goto exit_error;
}
#endif
value = KNET_RING_RCVBUFF;
#ifdef SO_SNDBUFFORCE
if (setsockopt(sock, SOL_SOCKET, SO_SNDBUFFORCE, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s send buffer: %s",
type, strerror(savederrno));
goto exit_error;
}
#else
if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s SO_SNDBUF: %s",
type, strerror(savederrno));
goto exit_error;
}
#endif
exit_error:
errno = savederrno;
return err;
}
int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type)
{
int err = 0, savederrno = 0;
int value;
if (_configure_common_socket(knet_h, sock, type) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
#ifdef IP_FREEBIND
value = 1;
if (setsockopt(sock, SOL_IP, IP_FREEBIND, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set FREEBIND on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
#endif
#ifdef IP_BINDANY /* BSD */
value = 1;
if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set BINDANY on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
#endif
if (address->ss_family == AF_INET6) {
value = 1;
if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY,
&value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s IPv6 only: %s",
type, strerror(savederrno));
goto exit_error;
}
#ifdef IPV6_MTU_DISCOVER
value = IPV6_PMTUDISC_PROBE;
if (setsockopt(sock, SOL_IPV6, IPV6_MTU_DISCOVER, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
#else
value = 1;
if (setsockopt(sock, IPPROTO_IPV6, IPV6_DONTFRAG, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set DONTFRAG on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
#endif
} else {
#ifdef IP_MTU_DISCOVER
value = IP_PMTUDISC_PROBE;
if (setsockopt(sock, SOL_IP, IP_MTU_DISCOVER, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
#else
value = 1;
if (setsockopt(sock, IPPROTO_IP, IP_DONTFRAG, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set DONTFRAG on %s socket: %s",
type, strerror(savederrno));
goto exit_error;
}
#endif
}
value = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s reuseaddr: %s",
type, strerror(savederrno));
goto exit_error;
}
exit_error:
errno = savederrno;
return err;
}
int _init_socketpair(knet_handle_t knet_h, int *sock)
{
int err = 0, savederrno = 0;
int i;
if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sock) != 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
for (i = 0; i < 2; i++) {
if (_configure_common_socket(knet_h, sock[i], "local socketpair") < 0) {
savederrno = errno;
err = -1;
goto exit_fail;
}
}
exit_fail:
errno = savederrno;
return err;
}
void _close_socketpair(knet_handle_t knet_h, int *sock)
{
int i;
for (i = 0; i < 2; i++) {
if (sock[i]) {
close(sock[i]);
sock[i] = 0;
}
}
}
/*
* must be called with global read lock
*
* return -1 on error
* return 0 if fd is invalid
* return 1 if fd is valid
*/
int _is_valid_fd(knet_handle_t knet_h, int sockfd)
{
int ret = 0;
if (sockfd < 0) {
errno = EINVAL;
return -1;
}
if (sockfd > KNET_MAX_FDS) {
errno = EINVAL;
return -1;
}
if (knet_h->knet_transport_fd_tracker[sockfd].transport >= KNET_MAX_TRANSPORTS) {
ret = 0;
} else {
ret = 1;
}
return ret;
}
/*
* must be called with global write lock
*/
int _set_fd_tracker(knet_handle_t knet_h, int sockfd, uint8_t transport, uint8_t data_type, void *data)
{
if (sockfd < 0) {
errno = EINVAL;
return -1;
}
if (sockfd > KNET_MAX_FDS) {
errno = EINVAL;
return -1;
}
knet_h->knet_transport_fd_tracker[sockfd].transport = transport;
knet_h->knet_transport_fd_tracker[sockfd].data_type = data_type;
knet_h->knet_transport_fd_tracker[sockfd].data = data;
return 0;
}
/*
* public api
*/
int knet_handle_get_transport_list(knet_handle_t knet_h,
struct transport_info *transport_list, size_t *transport_list_entries)
{
int err = 0, savederrno = 0;
int i, count;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!transport_list) {
errno = EINVAL;
return -1;
}
if (!transport_list_entries) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
count = 0;
/*
* we could potentially build this struct
* at knet_handle_new init time, but
* let's keep it dynamic in case at somepoint
* we need to init transports dynamically
* at runtime vs init time.
*/
for (i=0; i<KNET_MAX_TRANSPORTS; i++) {
if (knet_h->transport_ops[i]) {
transport_list[count].name = knet_h->transport_ops[i]->transport_name;
transport_list[count].id = knet_h->transport_ops[i]->transport_id;
count++;
}
}
*transport_list_entries = count;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return err;
}
const char *knet_handle_get_transport_name_by_id(knet_handle_t knet_h, uint8_t transport)
{
int savederrno = 0;
const char *name = NULL;
if (!knet_h) {
errno = EINVAL;
return name;
}
if (transport >= KNET_MAX_TRANSPORTS) {
errno = EINVAL;
return name;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return name;
}
if (knet_h->transport_ops[transport]) {
name = knet_h->transport_ops[transport]->transport_name;
} else {
savederrno = ENOENT;
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return name;
}
uint8_t knet_handle_get_transport_id_by_name(knet_handle_t knet_h, const char *name)
{
int savederrno = 0;
uint8_t err = KNET_MAX_TRANSPORTS;
int i;
if (!knet_h) {
errno = EINVAL;
return err;
}
if (!name) {
errno = EINVAL;
return err;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return err;
}
for (i=0; i<KNET_MAX_TRANSPORTS; i++) {
if (knet_h->transport_ops[i]) {
if (!strcmp(knet_h->transport_ops[i]->transport_name, name)) {
err = knet_h->transport_ops[i]->transport_id;
break;
}
}
}
if (err == KNET_MAX_TRANSPORTS) {
savederrno = EINVAL;
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
diff --git a/libknet/transports.h b/libknet/transports.h
index abc7c7fe..e47f7128 100644
--- a/libknet/transports.h
+++ b/libknet/transports.h
@@ -1,24 +1,26 @@
/*
* Copyright (C) 2016 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#ifndef __TRANSPORTS_H__
#define __TRANSPORTS_H__
knet_transport_ops_t *get_udp_transport(void);
knet_transport_ops_t *get_sctp_transport(void);
int _configure_common_socket(knet_handle_t knet_h, int sock, const char *type);
int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type);
int _init_socketpair(knet_handle_t knet_h, int *sock);
void _close_socketpair(knet_handle_t knet_h, int *sock);
int _set_fd_tracker(knet_handle_t knet_h, int sockfd, uint8_t transport, uint8_t data_type, void *data);
int _is_valid_fd(knet_handle_t knet_h, int sockfd);
+int _sendmmsg(int sockfd, struct knet_mmsghdr *msgvec, unsigned int vlen, unsigned int flags);
+
#endif
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, Feb 26, 7:11 PM (1 m, 33 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1465669
Default Alt Text
(43 KB)
Attached To
Mode
rK kronosnet
Attached
Detach File
Event Timeline
Log In to Comment