diff --git a/configure.ac b/configure.ac index 74c00e53..af9b3b1d 100644 --- a/configure.ac +++ b/configure.ac @@ -1,400 +1,399 @@ # # Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. # # Authors: Fabio M. Di Nitto # Federico Simoncelli # # This software licensed under GPL-2.0+, LGPL-2.0+ # # -*- Autoconf -*- # Process this file with autoconf to produce a configure script. # AC_PREREQ([2.63]) AC_INIT([kronosnet], m4_esyscmd([build-aux/git-version-gen .tarball-version]), [devel@lists.kronosnet.org]) AC_USE_SYSTEM_EXTENSIONS AM_INIT_AUTOMAKE([1.11.1 dist-bzip2 dist-xz color-tests -Wno-portability subdir-objects]) LT_PREREQ([2.2.6]) LT_INIT AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_SRCDIR([kronosnetd/main.c]) AC_CONFIG_HEADERS([config.h]) AC_CANONICAL_HOST AC_PROG_LIBTOOL AC_LANG([C]) systemddir=${prefix}/lib/systemd/system if test "$prefix" = "NONE"; then prefix="/usr" if test "$localstatedir" = "\${prefix}/var"; then localstatedir="/var" fi if test "$sysconfdir" = "\${prefix}/etc"; then sysconfdir="/etc" fi if test "$systemddir" = "NONE/lib/systemd/system"; then systemddir=/lib/systemd/system fi if test "$libdir" = "\${exec_prefix}/lib"; then if test -e /usr/lib64; then libdir="/usr/lib64" else libdir="/usr/lib" fi fi fi # Checks for programs. if ! ${MAKE-make} --version /cannot/make/this >/dev/null 2>&1; then AC_MSG_ERROR(["you don't seem to have GNU make; it is required"]) fi AC_PROG_AWK AC_PROG_GREP AC_PROG_SED AC_PROG_CPP AC_PROG_CC AM_PROG_CC_C_O AC_PROG_LN_S AC_PROG_INSTALL AC_PROG_MAKE_SET AC_PROG_CXX AC_PROG_RANLIB AC_CHECK_PROGS([PUBLICAN], [publican], [:]) AC_CHECK_PROGS([PKGCONFIG], [pkg-config]) AC_ARG_ENABLE([poc], [ --enable-poc : build poc code ],, [ enable_poc="yes" ]) AM_CONDITIONAL([BUILD_POC], test x$enable_poc = xyes) AC_ARG_ENABLE([kronosnetd], [ --enable-kronosnetd : Kronosnetd support ],, [ enable_kronosnetd="no" ]) AM_CONDITIONAL([BUILD_KRONOSNETD], test x$enable_kronosnetd = xyes) AC_ARG_ENABLE([libtap], [ --enable-libtap : libtap support ],, [ enable_libtap="no" ]) if test "x$enable_kronosnetd" = xyes; then enable_libtap=yes fi AM_CONDITIONAL([BUILD_LIBTAP], test x$enable_libtap = xyes) AC_ARG_ENABLE([libknet-sctp], [ --enable-libknet-sctp : libknet SCTP support ],, [ enable_libknet_sctp="yes" ]) ## local helper functions # this function checks if CC support options passed as # args. Global CFLAGS are ignored during this test. cc_supports_flag() { saveCPPFLAGS="$CPPFLAGS" CPPFLAGS="$@" if echo $CC | grep -q clang; then CPPFLAGS="-Werror $CPPFLAGS" fi AC_MSG_CHECKING([whether $CC supports "$@"]) AC_PREPROC_IFELSE([AC_LANG_PROGRAM([])], [RC=0; AC_MSG_RESULT([yes])], [RC=1; AC_MSG_RESULT([no])]) CPPFLAGS="$saveCPPFLAGS" return $RC } # helper macro to check libs without adding them to LIBS check_lib_no_libs() { lib_no_libs_arg1=$1 shift lib_no_libs_arg2=$1 shift lib_no_libs_args=$@ AC_CHECK_LIB([$lib_no_libs_arg1], [$lib_no_libs_arg2],,, [$lib_no_libs_args]) LIBS=$ac_check_lib_save_LIBS } # Checks for C features AC_C_INLINE # Checks for libraries. AC_CHECK_LIB([pthread], [pthread_create]) AC_CHECK_LIB([m], [ceil]) AC_CHECK_LIB([rt], [clock_gettime]) PKG_CHECK_MODULES([nss],[nss]) # Checks for header files. AC_CHECK_HEADERS([fcntl.h]) AC_CHECK_HEADERS([stdlib.h]) AC_CHECK_HEADERS([string.h]) AC_CHECK_HEADERS([strings.h]) AC_CHECK_HEADERS([sys/ioctl.h]) AC_CHECK_HEADERS([syslog.h]) AC_CHECK_HEADERS([unistd.h]) AC_CHECK_HEADERS([netinet/in.h]) AC_CHECK_HEADERS([sys/socket.h]) AC_CHECK_HEADERS([arpa/inet.h]) AC_CHECK_HEADERS([netdb.h]) AC_CHECK_HEADERS([limits.h]) AC_CHECK_HEADERS([stdint.h]) AC_CHECK_HEADERS([sys/epoll.h]) if test "x$enable_libknet_sctp" = xyes; then AC_CHECK_HEADERS([netinet/sctp.h],, AC_MSG_ERROR(["missing required SCTP headers"])) fi # Checks for typedefs, structures, and compiler characteristics. AC_C_INLINE AC_TYPE_SIZE_T AC_TYPE_PID_T AC_TYPE_SSIZE_T AC_TYPE_UINT8_T AC_TYPE_UINT16_T AC_TYPE_UINT32_T AC_TYPE_UINT64_T AC_TYPE_INT32_T # Checks for library functions. AC_FUNC_ALLOCA AC_FUNC_FORK AC_FUNC_MALLOC AC_FUNC_REALLOC AC_CHECK_FUNCS([memset]) AC_CHECK_FUNCS([strdup]) AC_CHECK_FUNCS([strerror]) AC_CHECK_FUNCS([dup2]) AC_CHECK_FUNCS([select]) AC_CHECK_FUNCS([socket]) AC_CHECK_FUNCS([inet_ntoa]) AC_CHECK_FUNCS([memmove]) AC_CHECK_FUNCS([strchr]) AC_CHECK_FUNCS([atexit]) AC_CHECK_FUNCS([ftruncate]) AC_CHECK_FUNCS([strrchr]) AC_CHECK_FUNCS([strstr]) AC_CHECK_FUNCS([clock_gettime]) AC_CHECK_FUNCS([strcasecmp]) -AC_CHECK_FUNCS([sendmmsg]) AC_CHECK_FUNCS([recvmmsg]) AC_CHECK_FUNCS([kevent]) # if neither sys/epoll.h nor kevent are present, we should fail. if test "x$ac_cv_header_sys_epoll_h" = xno && test "x$ac_cv_func_kevent" = xno; then AC_MSG_ERROR([Both epoll and kevent unavailable on this OS]) fi if test "x$ac_cv_header_sys_epoll_h" = xyes && test "x$ac_cv_func_kevent" = xyes; then AC_MSG_ERROR([Both epoll and kevent available on this OS, please contact the maintainers to fix the code]) fi # checks (for kronosnetd) if test "x$enable_kronosnetd" = xyes; then AC_CHECK_HEADERS([security/pam_appl.h], [AC_CHECK_LIB([pam], [pam_start])], [AC_MSG_ERROR([Unable to find LinuxPAM devel files])]) AC_CHECK_HEADERS([security/pam_misc.h], [AC_CHECK_LIB([pam_misc], [misc_conv])], [AC_MSG_ERROR([Unable to find LinuxPAM MISC devel files])]) PKG_CHECK_MODULES([libqb], [libqb]) AC_CHECK_LIB([qb], [qb_log_thread_priority_set], [have_qb_log_thread_priority_set="yes"], [have_qb_log_thread_priority_set="no"]) if test "x${have_qb_log_thread_priority_set}" = xyes; then AC_DEFINE_UNQUOTED([HAVE_QB_LOG_THREAD_PRIORITY_SET], 1, [have qb_log_thread_priority_set]) fi fi # local options AC_ARG_ENABLE([debug], [ --enable-debug enable debug build. ], [ default="no" ]) AC_ARG_ENABLE([publicandocs], [ --enable-publicandocs enable docs build. ], [ default="no" ]) AC_ARG_WITH([initdefaultdir], [ --with-initdefaultdir : path to /etc/sysconfig/.. or /etc/default dir. ], [ INITDEFAULTDIR="$withval" ], [ INITDEFAULTDIR="$sysconfdir/default" ]) AC_ARG_WITH([initddir], [ --with-initddir=DIR : path to init script directory. ], [ INITDDIR="$withval" ], [ INITDDIR="$sysconfdir/init.d" ]) AC_ARG_WITH([systemddir], [ --with-systemddir=DIR : path to systemd unit files directory. ], [ SYSTEMDDIR="$withval" ], [ SYSTEMDDIR="$systemddir" ]) AC_ARG_WITH([syslogfacility], [ --with-syslogfacility=FACILITY default syslog facility. ], [ SYSLOGFACILITY="$withval" ], [ SYSLOGFACILITY="LOG_DAEMON" ]) AC_ARG_WITH([sysloglevel], [ --with-sysloglevel=LEVEL default syslog level. ], [ SYSLOGLEVEL="$withval" ], [ SYSLOGLEVEL="LOG_INFO" ]) AC_ARG_WITH([defaultadmgroup], [ --with-defaultadmgroup=GROUP define PAM group. Users part of this group will be allowed to configure kronosnet. Others will only receive read-only rights. ], [ DEFAULTADMGROUP="$withval" ], [ DEFAULTADMGROUP="kronosnetadm" ]) ## random vars LOGDIR=${localstatedir}/log/ RUNDIR=${localstatedir}/run/ DEFAULT_CONFIG_DIR=${sysconfdir}/kronosnet ## do subst AM_CONDITIONAL([BUILD_DOCS], [test "x${enable_publicandocs}" = xyes]) AM_CONDITIONAL([DEBUG], [test "x${enable_debug}" = xyes]) AC_SUBST([DEFAULT_CONFIG_DIR]) AC_SUBST([INITDEFAULTDIR]) AC_SUBST([INITDDIR]) AC_SUBST([SYSTEMDDIR]) AC_SUBST([LOGDIR]) AC_SUBST([DEFAULTADMGROUP]) AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_DIR], ["$(eval echo ${DEFAULT_CONFIG_DIR})"], [Default config directory]) AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_FILE], ["$(eval echo ${DEFAULT_CONFIG_DIR}/kronosnetd.conf)"], [Default config file]) AC_DEFINE_UNQUOTED([LOGDIR], ["$(eval echo ${LOGDIR})"], [Default logging directory]) AC_DEFINE_UNQUOTED([DEFAULT_LOG_FILE], ["$(eval echo ${LOGDIR}/kronosnetd.log)"], [Default log file]) AC_DEFINE_UNQUOTED([RUNDIR], ["$(eval echo ${RUNDIR})"], [Default run directory]) AC_DEFINE_UNQUOTED([SYSLOGFACILITY], [$(eval echo ${SYSLOGFACILITY})], [Default syslog facility]) AC_DEFINE_UNQUOTED([SYSLOGLEVEL], [$(eval echo ${SYSLOGLEVEL})], [Default syslog level]) AC_DEFINE_UNQUOTED([DEFAULTADMGROUP], ["$(eval echo ${DEFAULTADMGROUP})"], [Default admin group]) ## *FLAGS handling ENV_CFLAGS="$CFLAGS" ENV_CPPFLAGS="$CPPFLAGS" ENV_LDFLAGS="$LDFLAGS" # debug build stuff if test "x${enable_debug}" = xyes; then AC_DEFINE_UNQUOTED([DEBUG], [1], [Compiling Debugging code]) OPT_CFLAGS="-O0" else OPT_CFLAGS="-O3" fi # gdb flags if test "x${GCC}" = xyes; then GDB_FLAGS="-ggdb3" else GDB_FLAGS="-g" fi # extra warnings EXTRA_WARNINGS="" WARNLIST=" all shadow missing-prototypes missing-declarations strict-prototypes declaration-after-statement pointer-arith write-strings cast-align bad-function-cast missing-format-attribute format=2 format-security format-nonliteral no-long-long unsigned-char gnu89-inline no-strict-aliasing error address cpp overflow parentheses sequence-point switch uninitialized unused-but-set-variable unused-function unused-result unused-value unused-variable " for j in $WARNLIST; do if cc_supports_flag -W$j; then EXTRA_WARNINGS="$EXTRA_WARNINGS -W$j"; fi done CFLAGS="$ENV_CFLAGS $lt_prog_compiler_pic $OPT_CFLAGS $GDB_FLAGS \ $EXTRA_WARNINGS $WERROR_CFLAGS" CPPFLAGS="$ENV_CPPFLAGS" LDFLAGS="$ENV_LDFLAGS $lt_prog_compiler_pic -Wl,--as-needed" AC_CONFIG_FILES([ Makefile init/Makefile libtap/Makefile libtap/libtap.pc kronosnetd/Makefile kronosnetd/kronosnetd.logrotate libknet/Makefile libknet/libknet.pc libknet/tests/Makefile docs/Makefile poc-code/Makefile poc-code/iov-hash/Makefile poc-code/access-list/Makefile ]) AC_OUTPUT diff --git a/libknet/compat.c b/libknet/compat.c index e4d0e096..a7b5c244 100644 --- a/libknet/compat.c +++ b/libknet/compat.c @@ -1,193 +1,159 @@ /* * Copyright (C) 2016 Red Hat, Inc. All rights reserved. * * Author: Jan Friesse * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include "compat.h" -#ifndef HAVE_SENDMMSG -int sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, - unsigned int flags) -{ -#ifdef SYS_sendmmsg - /* - * For systems where kernel supports sendmmsg but glibc doesn't (RHEL 6) - */ - return (syscall(SYS_sendmmsg, sockfd, msgvec, vlen, flags)); -#else - /* - * Generic implementation of sendmmsg using sendmsg - */ - unsigned int i; - ssize_t ret; - - if (vlen == 0) { - return (0); - } - - for (i = 0; i < vlen; i++) { - ret = sendmsg(sockfd, &msgvec[i].msg_hdr, flags); - if (ret >= 0) { - msgvec[i].msg_len = ret; - } else { - break ; - } - } - - return ((ret >= 0) ? vlen : ret); -#endif -} -#endif - #ifndef HAVE_RECVMMSG extern int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, unsigned int flags, struct timespec *timeout) { #ifdef SYS_recvmmsg /* * For systems where kernel supports recvmmsg but glibc doesn't (RHEL 6) */ return (syscall(SYS_recvmmsg, sockfd, msgvec, vlen, flags, timeout)); #else /* * Generic implementation of recvmmsg using recvmsg */ unsigned int i; ssize_t ret; if (vlen == 0) { return (0); } if ((timeout != NULL) || (flags & MSG_WAITFORONE)) { /* * Not implemented */ errno = EINVAL; return (-1); } for (i = 0; i < vlen; i++) { ret = recvmsg(sockfd, &msgvec[i].msg_hdr, flags); if (ret >= 0) { msgvec[i].msg_len = ret; } else { if (ret == -1 && errno == EAGAIN) { ret = 0; } break ; } } return ((ret >= 0) ? i : ret); #endif } #endif #ifndef HAVE_SYS_EPOLL_H #ifdef HAVE_KEVENT /* for FreeBSD which has kevent instead of epoll */ #include #include #include #include static int32_t _poll_to_filter_(int32_t event) { int32_t out = 0; if (event & POLLIN) out |= EVFILT_READ; if (event & POLLOUT) out |= EVFILT_WRITE; return out; } int epoll_create(int size) { return kqueue(); } int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) { int ret = 0; struct kevent ke; short filters = _poll_to_filter_(event->events); switch (op) { /* The kevent man page says that EV_ADD also does MOD */ case EPOLL_CTL_ADD: case EPOLL_CTL_MOD: EV_SET(&ke, fd, filters, EV_ADD | EV_ENABLE, 0, 0, event->data.ptr); break; case EPOLL_CTL_DEL: EV_SET(&ke, fd, filters, EV_DELETE, 0, 0, event->data.ptr); break; default: errno = EINVAL; return -1; } ret = kevent(epfd, &ke, 1, NULL, 0, NULL); return ret; } int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout_ms) { struct kevent kevents[maxevents]; struct timespec timeout = { 0, 0 }; struct timespec *timeout_ptr = &timeout; uint32_t revents; int event_count; int i; int returned_events; if (timeout_ms != -1) { timeout.tv_sec = timeout_ms/1000; timeout.tv_nsec += (timeout_ms % 1000) * 1000000ULL; } else { timeout_ptr = NULL; } event_count = kevent(epfd, NULL, 0, kevents, maxevents, timeout_ptr); if (event_count == -1) { return -1; } returned_events = 0; for (i = 0; i < event_count; i++) { revents = 0; if (kevents[i].flags & EV_ERROR) { revents |= POLLERR; } if (kevents[i].flags & EV_EOF) { revents |= POLLHUP; } if (kevents[i].filter == EVFILT_READ) { revents |= POLLIN; } if (kevents[i].filter == EVFILT_WRITE) { revents |= POLLOUT; } events[returned_events].events = revents; events[returned_events].data.ptr = kevents[i].udata; returned_events++; } return returned_events; } #endif /* HAVE_KEVENT */ #endif /* HAVE_SYS_EPOLL_H */ diff --git a/libknet/compat.h b/libknet/compat.h index 05d6116b..48297869 100644 --- a/libknet/compat.h +++ b/libknet/compat.h @@ -1,73 +1,66 @@ /* * Copyright (C) 2016 Red Hat, Inc. All rights reserved. * * Authors: Jan Friesse * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #ifndef __COMPAT_H__ #define __COMPAT_H__ #include "config.h" #include #include /* FreeBSD has recvmmsg but it's a buggy wrapper */ #ifdef __FreeBSD__ #define recvmmsg COMPAT_recvmmsg -#define sendmmsg COMPAT_sendmmsg #undef HAVE_RECVMMSG -#undef HAVE_SENDMMSG #endif #ifndef MSG_WAITFORONE #define MSG_WAITFORONE 0x10000 #endif -#ifndef HAVE_SENDMMSG -extern int sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, - unsigned int flags); -#endif - #ifndef HAVE_RECVMMSG extern int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, unsigned int flags, struct timespec *timeout); #endif #ifndef ETIME #define ETIME ETIMEDOUT #endif #ifdef HAVE_SYS_EPOLL_H #include #else #ifdef HAVE_KEVENT #include #define EPOLL_CTL_ADD 1 #define EPOLL_CTL_MOD 2 #define EPOLL_CTL_DEL 3 #define EPOLLIN POLLIN #define EPOLLOUT POLLOUT typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout_ms); #endif /* HAVE_KEVENT */ #endif /* HAVE_SYS_EPOLL_H */ #endif /* __COMPAT_H__ */ diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index 4cad4374..23962c8d 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -1,609 +1,609 @@ /* * Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include "compat.h" #include "crypto.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_tx.h" #include "netutils.h" /* * SEND */ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send) { int link_idx, msg_idx, sent_msgs, prev_sent, progress; int err = 0, savederrno = 0; struct knet_mmsghdr *cur; for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { sent_msgs = 0; prev_sent = 0; progress = 1; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr; msg_idx++; } retry: cur = &msg[prev_sent]; - sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, - (struct mmsghdr *)&cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL); + sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, + &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno); switch(err) { case -1: /* unrecoverable error */ goto out_unlock; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ goto retry; break; } prev_sent = prev_sent + sent_msgs; if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) { if ((sent_msgs) || (progress)) { if (sent_msgs) { progress = 1; } else { progress = 0; } #ifdef DEBUG log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", sent_msgs, msg_idx, dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id); #endif goto retry; } if (!progress) { savederrno = EAGAIN; err = -1; goto out_unlock; } } if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } } out_unlock: errno = savederrno; return err; } static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync) { ssize_t outlen, frag_len; struct knet_host *dst_host; knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; size_t dst_host_ids_entries_temp = 0; knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[PCKT_FRAG_MAX]; uint8_t frag_idx; unsigned int temp_data_mtu; int host_idx; int send_mcast = 0; struct knet_header *inbuf; int savederrno = 0; int err = 0; seq_num_t tx_seq_num; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; int msgs_to_send, msg_idx; inbuf = knet_h->recv_from_sock_buf[buf_idx]; if ((knet_h->enabled != 1) && (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); savederrno = ECANCELED; err = -1; goto out_unlock; } /* * move this into a separate function to expand on * extra switching rules */ switch(inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: if (knet_h->dst_host_filter_fn) { bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, inlen, KNET_NOTIFY_TX, knet_h->host_id, knet_h->host_id, &channel, dst_host_ids_temp, &dst_host_ids_entries_temp); if (bcast < 0) { log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); savederrno = EFAULT; err = -1; goto out_unlock; } if ((!bcast) && (!dst_host_ids_entries_temp)) { log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); savederrno = EINVAL; err = -1; goto out_unlock; } } break; case KNET_HEADER_TYPE_HOST_INFO: knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { bcast = 0; dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id; dst_host_ids_entries_temp = 1; knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id); } break; default: log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket"); savederrno = ENOMSG; err = -1; goto out_unlock; break; } if (is_sync) { if ((bcast) || ((!bcast) && (dst_host_ids_entries_temp > 1))) { log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); savederrno = E2BIG; err = -1; goto out_unlock; } } /* * check destinations hosts before spending time * in fragmenting/encrypting packets to save * time processing data for unrechable hosts. * for unicast, also remap the destination data * to skip unreachable hosts. */ if (!bcast) { dst_host_ids_entries = 0; for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; if (!dst_host) { continue; } if (dst_host->status.reachable) { dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; dst_host_ids_entries++; } } if (!dst_host_ids_entries) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } else { send_mcast = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { send_mcast = 1; break; } } if (!send_mcast) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } if (!knet_h->data_mtu) { /* * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but data MTU is still unknown." " Packet might not be delivered." " Assuming mininum IPv4 mtu (%d)", KNET_PMTUD_MIN_MTU_V4); temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; } else { /* * take a copy of the mtu to avoid value changing under * our feet while we are sending a fragmented pckt */ temp_data_mtu = knet_h->data_mtu; } /* * prepare the outgoing buffers */ frag_len = inlen; frag_idx = 0; inbuf->khp_data_bcast = bcast; inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); inbuf->khp_data_channel = channel; if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); goto out_unlock; } knet_h->tx_seq_num++; /* * force seq_num 0 to detect a node that has crashed and rejoining * the knet instance. seq_num 0 will clear the buffers in the RX * thread */ if (knet_h->tx_seq_num == 0) { knet_h->tx_seq_num++; } /* * cache the value in locked context */ tx_seq_num = knet_h->tx_seq_num; inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num); pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); /* * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 * pckts. * this solves 2 problems: * 1) on TX socket overloads we generate extra pings to keep links alive * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, * node 3+ will be able to keep in sync on the TX seq_num even without * receiving traffic or pings in betweens. This avoids issues with * rollover of the circular buffer */ if (tx_seq_num % (SEQ_MAX / 8) == 0) { _send_pings(knet_h, 0); } if (inbuf->khp_data_frag_num > 1) { while (frag_idx < inbuf->khp_data_frag_num) { /* * set the iov_base */ iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; /* * set the len */ if (frag_len > temp_data_mtu) { iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE; } else { iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE; } /* * copy the frag info on all buffers */ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num; knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata, inbuf->khp_data_userdata + (temp_data_mtu * frag_idx), iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE); frag_len = frag_len - temp_data_mtu; frag_idx++; } } else { iov_out[frag_idx].iov_base = (void *)inbuf; iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE; } if (knet_h->crypto_instance) { frag_idx = 0; while (frag_idx < inbuf->khp_data_frag_num) { if (crypto_encrypt_and_sign( knet_h, (const unsigned char *)iov_out[frag_idx].iov_base, iov_out[frag_idx].iov_len, knet_h->send_to_links_buf_crypt[frag_idx], &outlen) < 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet"); savederrno = ECHILD; err = -1; goto out_unlock; } iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; iov_out[frag_idx].iov_len = outlen; frag_idx++; } } memset(&msg, 0, sizeof(msg)); msgs_to_send = inbuf->khp_data_frag_num; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx]; msg[msg_idx].msg_hdr.msg_iovlen = 1; msg_idx++; } if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { dst_host = knet_h->host_index[dst_host_ids[host_idx]]; err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } else { for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { goto out_unlock; } } } } out_unlock: errno = savederrno; return err; } int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", strerror(savederrno)); err = -1; goto out; } knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA; memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len); err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1); savederrno = errno; pthread_mutex_unlock(&knet_h->tx_mutex); out: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct knet_mmsghdr *msg, int type) { ssize_t inlen = 0; struct iovec iov_in; int msg_recv, i; int savederrno = 0, docallback = 0; if ((channel >= 0) && (channel < KNET_DATAFD_MAX) && (!knet_h->sockfd[channel].is_socket)) { memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata; iov_in.iov_len = KNET_MAX_PACKET_SIZE; inlen = readv(sockfd, &iov_in, 1); if (inlen <= 0) { savederrno = errno; docallback = 1; goto out; } msg_recv = 1; knet_h->recv_from_sock_buf[0]->kh_type = type; _parse_recv_from_sock(knet_h, 0, inlen, channel, 0); } else { msg_recv = recvmmsg(sockfd, (struct mmsghdr *)&msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); if (msg_recv < 0) { inlen = msg_recv; savederrno = errno; docallback = 1; goto out; } for (i = 0; i < msg_recv; i++) { inlen = msg[i].msg_len; if (inlen == 0) { savederrno = 0; docallback = 1; goto out; break; } knet_h->recv_from_sock_buf[i]->kh_type = type; _parse_recv_from_sock(knet_h, i, inlen, channel, 0); } } out: if (inlen < 0) { struct epoll_event ev; memset(&ev, 0, sizeof(struct epoll_event)); if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); } else { knet_h->sockfd[channel].has_error = 1; } } if (docallback) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_TX, inlen, savederrno); } } void *_handle_send_to_links_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct knet_mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; int i, nev, type; int8_t channel; memset(&msg, 0, sizeof(msg)); /* preparing data buffer */ for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata; iov_in[i].iov_len = KNET_MAX_PACKET_SIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0; knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id); knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1; knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id); } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1); if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); continue; } for (i = 0; i < nev; i++) { if (events[i].data.fd == knet_h->hostsockfd[0]) { type = KNET_HEADER_TYPE_HOST_INFO; channel = -1; } else { type = KNET_HEADER_TYPE_DATA; for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { if ((knet_h->sockfd[channel].in_use) && (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { break; } } } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; } _handle_send_to_links(knet_h, events[i].data.fd, channel, &msg[0], type); pthread_mutex_unlock(&knet_h->tx_mutex); } pthread_rwlock_unlock(&knet_h->global_rwlock); } return NULL; } diff --git a/libknet/transport_common.c b/libknet/transport_common.c index a9eb59e7..d56d4e0e 100644 --- a/libknet/transport_common.c +++ b/libknet/transport_common.c @@ -1,403 +1,428 @@ #include "config.h" #include #include #include #include #include #include #include "libknet.h" #include "compat.h" #include "host.h" #include "link.h" #include "logging.h" #include "common.h" #include "transports.h" +/* + * reuse Jan Friesse's compat layer as wrapper to drop usage of sendmmsg + * + * TODO: kill those wrappers once we work on packet delivery guaranteed + */ + +int _sendmmsg(int sockfd, struct knet_mmsghdr *msgvec, unsigned int vlen, unsigned int flags) +{ + int savederrno = 0, err = 0; + unsigned int i; + + for (i = 0; i < vlen; i++) { + err = sendmsg(sockfd, &msgvec[i].msg_hdr, flags); + savederrno = errno; + if (err >= 0) { + msgvec[i].msg_len = err; + } else { + break; + } + } + + errno = savederrno; + return ((err >= 0) ? vlen : err); +} + int _configure_common_socket(knet_handle_t knet_h, int sock, const char *type) { int err = 0, savederrno = 0; int value; if (_fdset_cloexec(sock)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s CLOEXEC socket opts: %s", type, strerror(savederrno)); goto exit_error; } if (_fdset_nonblock(sock)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s NONBLOCK socket opts: %s", type, strerror(savederrno)); goto exit_error; } value = KNET_RING_RCVBUFF; #ifdef SO_RCVBUFFORCE if (setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s receive buffer: %s", type, strerror(savederrno)); goto exit_error; } #else if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s SO_RECVBUF: %s", type, strerror(savederrno)); goto exit_error; } #endif value = KNET_RING_RCVBUFF; #ifdef SO_SNDBUFFORCE if (setsockopt(sock, SOL_SOCKET, SO_SNDBUFFORCE, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s send buffer: %s", type, strerror(savederrno)); goto exit_error; } #else if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s SO_SNDBUF: %s", type, strerror(savederrno)); goto exit_error; } #endif exit_error: errno = savederrno; return err; } int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type) { int err = 0, savederrno = 0; int value; if (_configure_common_socket(knet_h, sock, type) < 0) { savederrno = errno; err = -1; goto exit_error; } #ifdef IP_FREEBIND value = 1; if (setsockopt(sock, SOL_IP, IP_FREEBIND, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set FREEBIND on %s socket: %s", type, strerror(savederrno)); goto exit_error; } #endif #ifdef IP_BINDANY /* BSD */ value = 1; if (setsockopt(sock, IPPROTO_IP, IP_BINDANY, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set BINDANY on %s socket: %s", type, strerror(savederrno)); goto exit_error; } #endif if (address->ss_family == AF_INET6) { value = 1; if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s IPv6 only: %s", type, strerror(savederrno)); goto exit_error; } #ifdef IPV6_MTU_DISCOVER value = IPV6_PMTUDISC_PROBE; if (setsockopt(sock, SOL_IPV6, IPV6_MTU_DISCOVER, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s", type, strerror(savederrno)); goto exit_error; } #else value = 1; if (setsockopt(sock, IPPROTO_IPV6, IPV6_DONTFRAG, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set DONTFRAG on %s socket: %s", type, strerror(savederrno)); goto exit_error; } #endif } else { #ifdef IP_MTU_DISCOVER value = IP_PMTUDISC_PROBE; if (setsockopt(sock, SOL_IP, IP_MTU_DISCOVER, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s", type, strerror(savederrno)); goto exit_error; } #else value = 1; if (setsockopt(sock, IPPROTO_IP, IP_DONTFRAG, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set DONTFRAG on %s socket: %s", type, strerror(savederrno)); goto exit_error; } #endif } value = 1; if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s reuseaddr: %s", type, strerror(savederrno)); goto exit_error; } exit_error: errno = savederrno; return err; } int _init_socketpair(knet_handle_t knet_h, int *sock) { int err = 0, savederrno = 0; int i; if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sock) != 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize socketpair: %s", strerror(savederrno)); goto exit_fail; } for (i = 0; i < 2; i++) { if (_configure_common_socket(knet_h, sock[i], "local socketpair") < 0) { savederrno = errno; err = -1; goto exit_fail; } } exit_fail: errno = savederrno; return err; } void _close_socketpair(knet_handle_t knet_h, int *sock) { int i; for (i = 0; i < 2; i++) { if (sock[i]) { close(sock[i]); sock[i] = 0; } } } /* * must be called with global read lock * * return -1 on error * return 0 if fd is invalid * return 1 if fd is valid */ int _is_valid_fd(knet_handle_t knet_h, int sockfd) { int ret = 0; if (sockfd < 0) { errno = EINVAL; return -1; } if (sockfd > KNET_MAX_FDS) { errno = EINVAL; return -1; } if (knet_h->knet_transport_fd_tracker[sockfd].transport >= KNET_MAX_TRANSPORTS) { ret = 0; } else { ret = 1; } return ret; } /* * must be called with global write lock */ int _set_fd_tracker(knet_handle_t knet_h, int sockfd, uint8_t transport, uint8_t data_type, void *data) { if (sockfd < 0) { errno = EINVAL; return -1; } if (sockfd > KNET_MAX_FDS) { errno = EINVAL; return -1; } knet_h->knet_transport_fd_tracker[sockfd].transport = transport; knet_h->knet_transport_fd_tracker[sockfd].data_type = data_type; knet_h->knet_transport_fd_tracker[sockfd].data = data; return 0; } /* * public api */ int knet_handle_get_transport_list(knet_handle_t knet_h, struct transport_info *transport_list, size_t *transport_list_entries) { int err = 0, savederrno = 0; int i, count; if (!knet_h) { errno = EINVAL; return -1; } if (!transport_list) { errno = EINVAL; return -1; } if (!transport_list_entries) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } count = 0; /* * we could potentially build this struct * at knet_handle_new init time, but * let's keep it dynamic in case at somepoint * we need to init transports dynamically * at runtime vs init time. */ for (i=0; itransport_ops[i]) { transport_list[count].name = knet_h->transport_ops[i]->transport_name; transport_list[count].id = knet_h->transport_ops[i]->transport_id; count++; } } *transport_list_entries = count; pthread_rwlock_unlock(&knet_h->global_rwlock); return err; } const char *knet_handle_get_transport_name_by_id(knet_handle_t knet_h, uint8_t transport) { int savederrno = 0; const char *name = NULL; if (!knet_h) { errno = EINVAL; return name; } if (transport >= KNET_MAX_TRANSPORTS) { errno = EINVAL; return name; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return name; } if (knet_h->transport_ops[transport]) { name = knet_h->transport_ops[transport]->transport_name; } else { savederrno = ENOENT; } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return name; } uint8_t knet_handle_get_transport_id_by_name(knet_handle_t knet_h, const char *name) { int savederrno = 0; uint8_t err = KNET_MAX_TRANSPORTS; int i; if (!knet_h) { errno = EINVAL; return err; } if (!name) { errno = EINVAL; return err; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return err; } for (i=0; itransport_ops[i]) { if (!strcmp(knet_h->transport_ops[i]->transport_name, name)) { err = knet_h->transport_ops[i]->transport_id; break; } } } if (err == KNET_MAX_TRANSPORTS) { savederrno = EINVAL; } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } diff --git a/libknet/transports.h b/libknet/transports.h index abc7c7fe..e47f7128 100644 --- a/libknet/transports.h +++ b/libknet/transports.h @@ -1,24 +1,26 @@ /* * Copyright (C) 2016 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #ifndef __TRANSPORTS_H__ #define __TRANSPORTS_H__ knet_transport_ops_t *get_udp_transport(void); knet_transport_ops_t *get_sctp_transport(void); int _configure_common_socket(knet_handle_t knet_h, int sock, const char *type); int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type); int _init_socketpair(knet_handle_t knet_h, int *sock); void _close_socketpair(knet_handle_t knet_h, int *sock); int _set_fd_tracker(knet_handle_t knet_h, int sockfd, uint8_t transport, uint8_t data_type, void *data); int _is_valid_fd(knet_handle_t knet_h, int sockfd); +int _sendmmsg(int sockfd, struct knet_mmsghdr *msgvec, unsigned int vlen, unsigned int flags); + #endif