Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/libknet/Makefile.am b/libknet/Makefile.am
index 3048ca77..53dd339d 100644
--- a/libknet/Makefile.am
+++ b/libknet/Makefile.am
@@ -1,73 +1,76 @@
#
# Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
#
# Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
# Federico Simoncelli <fsimon@kronosnet.org>
#
# This software licensed under GPL-2.0+, LGPL-2.0+
#
MAINTAINERCLEANFILES = Makefile.in
include $(top_srcdir)/build-aux/check.mk
SYMFILE = libknet_exported_syms
EXTRA_DIST = $(SYMFILE)
SUBDIRS = . tests
libversion = 0:0:0
sources = \
common.c \
compat.c \
crypto.c \
handle.c \
host.c \
listener.c \
link.c \
logging.c \
nsscrypto.c \
threads_common.c \
threads_dsthandler.c \
threads_heartbeat.c \
threads_pmtud.c \
- threads_send_recv.c
+ threads_send_recv.c \
+ transport_udp.c \
+ transport_common.c
include_HEADERS = libknet.h
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libknet.pc
noinst_HEADERS = \
common.h \
compat.h \
crypto.h \
host.h \
internals.h \
link.h \
listener.h \
logging.h \
nsscrypto.h \
onwire.h \
threads_common.h \
threads_dsthandler.h \
threads_heartbeat.h \
threads_pmtud.h \
- threads_send_recv.h
+ threads_send_recv.h \
+ transports.h
lib_LTLIBRARIES = libknet.la
libknet_la_SOURCES = $(sources)
libknet_la_CFLAGS = $(nss_CFLAGS)
EXTRA_libknet_la_DEPENDENCIES = $(SYMFILE)
libknet_la_LDFLAGS = -Wl,--version-script=$(srcdir)/$(SYMFILE) \
--export-dynamic \
-version-number $(libversion)
libknet_la_LIBADD = $(nss_LIBS) -lrt -lpthread -lm
diff --git a/libknet/handle.c b/libknet/handle.c
index 93bc4add..a8e5b3fa 100644
--- a/libknet/handle.c
+++ b/libknet/handle.c
@@ -1,1444 +1,1455 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/uio.h>
#include <math.h>
#include <sys/time.h>
#include <sys/resource.h>
#include "internals.h"
#include "crypto.h"
#include "common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_pmtud.h"
#include "threads_dsthandler.h"
#include "threads_send_recv.h"
+#include "transports.h"
#include "logging.h"
static pthread_mutex_t handle_config_mutex = PTHREAD_MUTEX_INITIALIZER;
static int _init_locks(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_rwlock_init(&knet_h->global_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize list rwlock: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->lock_init_done = 1;
savederrno = pthread_rwlock_init(&knet_h->listener_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize listener rwlock: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_rwlock_init(&knet_h->host_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize host rwlock: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->host_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize host mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_cond_init(&knet_h->host_cond, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize host conditional mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->pmtud_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_cond_init(&knet_h->pmtud_cond, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud conditional mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->tx_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_locks(knet_handle_t knet_h)
{
knet_h->lock_init_done = 0;
pthread_rwlock_destroy(&knet_h->global_rwlock);
pthread_rwlock_destroy(&knet_h->listener_rwlock);
pthread_rwlock_destroy(&knet_h->host_rwlock);
pthread_mutex_destroy(&knet_h->host_mutex);
pthread_cond_destroy(&knet_h->host_cond);
pthread_mutex_destroy(&knet_h->pmtud_mutex);
pthread_cond_destroy(&knet_h->pmtud_cond);
pthread_mutex_destroy(&knet_h->tx_mutex);
}
static int _init_socketpair(knet_handle_t knet_h, int *sock)
{
int savederrno = 0;
int value;
int i;
if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sock) != 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
for (i = 0; i < 2; i++) {
if (_fdset_cloexec(sock[i])) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on sock[%d]: %s",
i, strerror(savederrno));
goto exit_fail;
}
if (_fdset_nonblock(sock[i])) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on sock[%d]: %s",
i, strerror(savederrno));
goto exit_fail;
}
value = KNET_RING_RCVBUFF;
if (setsockopt(sock[i], SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value)) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set receive buffer on sock[%d]: %s",
i, strerror(savederrno));
goto exit_fail;
}
value = KNET_RING_RCVBUFF;
if (setsockopt(sock[i], SOL_SOCKET, SO_SNDBUFFORCE, &value, sizeof(value)) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set send buffer on sock[%d]: %s",
i, strerror(savederrno));
goto exit_fail;
}
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_socketpair(knet_handle_t knet_h, int *sock)
{
int i;
for (i = 0; i < 2; i++) {
if (sock[i]) {
close(sock[i]);
sock[i] = 0;
}
}
}
static int _init_socks(knet_handle_t knet_h)
{
int savederrno = 0;
if (_init_socketpair(knet_h, knet_h->hostsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, knet_h->dstsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_socks(knet_handle_t knet_h)
{
_close_socketpair(knet_h, knet_h->dstsockfd);
_close_socketpair(knet_h, knet_h->hostsockfd);
}
static int _init_buffers(knet_handle_t knet_h)
{
int savederrno = 0;
int i;
size_t bufsize;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE;
knet_h->send_to_links_buf[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf[i], 0, bufsize);
knet_h->recv_from_sock_buf[i] = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_sock_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for app to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_sock_buf[i], 0, KNET_DATABUFSIZE);
knet_h->recv_from_links_buf[i] = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf[i], 0, KNET_DATABUFSIZE);
}
knet_h->pingbuf = malloc(KNET_HEADER_PING_SIZE);
if (!knet_h->pingbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf, 0, KNET_HEADER_PING_SIZE);
knet_h->pmtudbuf = malloc(KNET_PMTUD_SIZE_V6);
if (!knet_h->pmtudbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf, 0, KNET_PMTUD_SIZE_V6);
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE + KNET_DATABUFSIZE_CRYPT_PAD;
knet_h->send_to_links_buf_crypt[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf_crypt[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf_crypt[i], 0, bufsize);
}
knet_h->recv_from_links_buf_decrypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_decrypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_decrypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->recv_from_links_buf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pingbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pingbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pmtudbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pmtudbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_buffers(knet_handle_t knet_h)
{
int i;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
free(knet_h->send_to_links_buf[i]);
free(knet_h->recv_from_sock_buf[i]);
free(knet_h->send_to_links_buf_crypt[i]);
free(knet_h->recv_from_links_buf[i]);
}
free(knet_h->recv_from_links_buf_decrypt);
free(knet_h->recv_from_links_buf_crypt);
free(knet_h->pingbuf);
free(knet_h->pingbuf_crypt);
free(knet_h->pmtudbuf);
free(knet_h->pmtudbuf_crypt);
}
static int _init_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int savederrno = 0;
/*
* even if the kernel does dynamic allocation with epoll_ctl
* we need to reserve one extra for host to host communication
*/
knet_h->send_to_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (knet_h->send_to_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll datafd to link fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->recv_from_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->recv_from_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll link to datafd fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->dst_link_handler_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->dst_link_handler_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll dst cache fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->send_to_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd to link epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->recv_from_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on link to datafd epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->dst_link_handler_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on dst cache epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->hostsockfd[0];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->dstsockfd[0];
if (epoll_ctl(knet_h->dst_link_handler_epollfd,
EPOLL_CTL_ADD, knet_h->dstsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add dstsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int i;
memset(&ev, 0, sizeof(struct epoll_event));
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if (knet_h->sockfd[i].in_use) {
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created], &ev);
if (knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created]) {
_close_socketpair(knet_h, knet_h->sockfd[i].sockfd);
}
}
}
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev);
epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev);
close(knet_h->send_to_links_epollfd);
close(knet_h->recv_from_links_epollfd);
close(knet_h->dst_link_handler_epollfd);
}
static int _start_threads(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, 0,
_handle_pmtud_link_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start pmtud link thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->dst_link_handler_thread, 0,
_handle_dst_link_handler_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start dst cache thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->send_to_links_thread, 0,
_handle_send_to_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start datafd to link thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->recv_from_links_thread, 0,
_handle_recv_from_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start link to datafd thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->heartbt_thread, 0,
_handle_heartbt_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start heartbeat thread: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
+
+static void _stop_transports(knet_handle_t knet_h)
+{
+ knet_transport_ops_t *ops = NULL;
+
+ ops = get_udp_transport();
+ ops->handle_free(knet_h, knet_h->transport);
+}
+
static void _stop_threads(knet_handle_t knet_h)
{
void *retval;
/*
* allow threads to catch on shutdown request
* and release locks before we stop them.
* this isn't the most efficent way to handle it
* but it works good enough for now
*/
sleep(1);
pthread_mutex_lock(&knet_h->host_mutex);
pthread_cond_signal(&knet_h->host_cond);
pthread_mutex_unlock(&knet_h->host_mutex);
if (knet_h->heartbt_thread) {
pthread_cancel(knet_h->heartbt_thread);
pthread_join(knet_h->heartbt_thread, &retval);
}
if (knet_h->send_to_links_thread) {
pthread_cancel(knet_h->send_to_links_thread);
pthread_join(knet_h->send_to_links_thread, &retval);
}
if (knet_h->recv_from_links_thread) {
pthread_cancel(knet_h->recv_from_links_thread);
pthread_join(knet_h->recv_from_links_thread, &retval);
}
if (knet_h->dst_link_handler_thread) {
pthread_cancel(knet_h->dst_link_handler_thread);
pthread_join(knet_h->dst_link_handler_thread, &retval);
}
pthread_mutex_lock(&knet_h->pmtud_mutex);
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
sleep(1);
if (knet_h->pmtud_link_handler_thread) {
pthread_cancel(knet_h->pmtud_link_handler_thread);
pthread_join(knet_h->pmtud_link_handler_thread, &retval);
}
}
knet_handle_t knet_handle_new(uint16_t host_id,
int log_fd,
uint8_t default_log_level)
{
knet_handle_t knet_h;
int savederrno = 0;
struct rlimit cur;
if (getrlimit(RLIMIT_NOFILE, &cur) < 0) {
return NULL;
}
if ((log_fd < 0) || (log_fd >= cur.rlim_max)) {
errno = EINVAL;
return NULL;
}
/*
* validate incoming request
*/
if ((log_fd) && (default_log_level > KNET_LOG_DEBUG)) {
errno = EINVAL;
return NULL;
}
/*
* allocate handle
*/
knet_h = malloc(sizeof(struct knet_handle));
if (!knet_h) {
errno = ENOMEM;
return NULL;
}
memset(knet_h, 0, sizeof(struct knet_handle));
savederrno = pthread_mutex_lock(&handle_config_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s",
strerror(savederrno));
errno = savederrno;
goto exit_fail;
}
/*
* copy config in place
*/
knet_h->host_id = host_id;
knet_h->logfd = log_fd;
if (knet_h->logfd > 0) {
memset(&knet_h->log_levels, default_log_level, KNET_MAX_SUBSYSTEMS);
}
/*
* set pmtud default timers
*/
knet_h->pmtud_interval = KNET_PMTUD_DEFAULT_INTERVAL;
/*
* init main locking structures
*/
if (_init_locks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* init sockets
*/
if (_init_socks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* allocate packet buffers
*/
if (_init_buffers(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* create epoll fds
*/
if (_init_epolls(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* start internal threads
*/
if (_start_threads(knet_h)) {
savederrno = errno;
goto exit_fail;
}
pthread_mutex_unlock(&handle_config_mutex);
return knet_h;
exit_fail:
pthread_mutex_unlock(&handle_config_mutex);
knet_handle_free(knet_h);
errno = savederrno;
return NULL;
}
int knet_handle_free(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_mutex_lock(&handle_config_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h) {
pthread_mutex_unlock(&handle_config_mutex);
errno = EINVAL;
return -1;
}
if (!knet_h->lock_init_done) {
goto exit_nolock;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
pthread_mutex_unlock(&handle_config_mutex);
errno = savederrno;
return -1;
}
if (knet_h->host_head != NULL) {
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_HANDLE,
"Unable to free handle: host(s) or listener(s) are still active: %s",
strerror(savederrno));
pthread_rwlock_unlock(&knet_h->global_rwlock);
pthread_mutex_unlock(&handle_config_mutex);
errno = savederrno;
return -1;
}
knet_h->fini_in_progress = 1;
pthread_rwlock_unlock(&knet_h->global_rwlock);
+ _stop_transports(knet_h);
_stop_threads(knet_h);
_close_epolls(knet_h);
_destroy_buffers(knet_h);
_close_socks(knet_h);
crypto_fini(knet_h);
_destroy_locks(knet_h);
exit_nolock:
free(knet_h);
knet_h = NULL;
pthread_mutex_unlock(&handle_config_mutex);
return 0;
}
int knet_handle_enable_sock_notify(knet_handle_t knet_h,
void *sock_notify_fn_private_data,
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno))
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!sock_notify_fn) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data;
knet_h->sock_notify_fn = sock_notify_fn;
log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled");
pthread_rwlock_unlock(&knet_h->global_rwlock);
return err;
}
int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel)
{
int err = 0, savederrno = 0;
int i;
struct epoll_event ev;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd == NULL) {
errno = EINVAL;
return -1;
}
if (channel == NULL) {
errno = EINVAL;
return -1;
}
if (*channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sock_notify_fn) {
log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (*datafd > 0) {
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) {
log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i);
savederrno = EEXIST;
err = -1;
goto out_unlock;
}
}
}
/*
* auto allocate a channel
*/
if (*channel < 0) {
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if (!knet_h->sockfd[i].in_use) {
*channel = i;
break;
}
}
if (*channel < 0) {
savederrno = EBUSY;
err = -1;
goto out_unlock;
}
} else {
if (knet_h->sockfd[*channel].in_use) {
savederrno = EBUSY;
err = -1;
goto out_unlock;
}
}
knet_h->sockfd[*channel].is_created = 0;
knet_h->sockfd[*channel].is_socket = 0;
knet_h->sockfd[*channel].has_error = 0;
if (*datafd > 0) {
int sockopt;
socklen_t sockoptlen = sizeof(sockopt);
if (_fdset_cloexec(*datafd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s",
strerror(savederrno));
goto out_unlock;
}
if (_fdset_nonblock(*datafd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s",
strerror(savederrno));
goto out_unlock;
}
knet_h->sockfd[*channel].sockfd[0] = *datafd;
knet_h->sockfd[*channel].sockfd[1] = 0;
if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) {
knet_h->sockfd[*channel].is_socket = 1;
}
} else {
if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) {
savederrno = errno;
err = -1;
goto out_unlock;
}
knet_h->sockfd[*channel].is_created = 1;
knet_h->sockfd[*channel].is_socket = 1;
*datafd = knet_h->sockfd[*channel].sockfd[0];
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s",
knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno));
if (knet_h->sockfd[*channel].is_created) {
_close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd);
}
goto out_unlock;
}
knet_h->sockfd[*channel].in_use = 1;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd)
{
int err = 0, savederrno = 0;
int8_t channel = -1;
int i;
struct epoll_event ev;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd <= 0) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) &&
(knet_h->sockfd[i].sockfd[0] == datafd)) {
channel = i;
break;
}
}
if (channel < 0) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (!knet_h->sockfd[channel].has_error) {
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
goto out_unlock;
}
}
if (knet_h->sockfd[channel].is_created) {
_close_socketpair(knet_h, knet_h->sockfd[channel].sockfd);
}
memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock));
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd)
{
int err = 0, savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) {
errno = EINVAL;
return -1;
}
if (datafd == NULL) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
*datafd = knet_h->sockfd[channel].sockfd[0];
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel)
{
int err = 0, savederrno = 0;
int i;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd <= 0) {
errno = EINVAL;
return -1;
}
if (channel == NULL) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*channel = -1;
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) &&
(knet_h->sockfd[i].sockfd[0] == datafd)) {
*channel = i;
break;
}
}
if (*channel < 0) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_enable_filter(knet_handle_t knet_h,
void *dst_host_filter_fn_private_data,
int (*dst_host_filter_fn) (
void *private_data,
const unsigned char *outdata,
ssize_t outdata_len,
uint8_t tx_rx,
uint16_t this_host_id,
uint16_t src_node_id,
int8_t *channel,
uint16_t *dst_host_ids,
size_t *dst_host_ids_entries))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data;
knet_h->dst_host_filter_fn = dst_host_filter_fn;
if (knet_h->dst_host_filter_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((enabled < 0) || (enabled > 1)) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->enabled = enabled;
if (enabled) {
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*interval = knet_h->pmtud_interval;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((!interval) || (interval > 86400)) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_interval = interval;
log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval);
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_enable_pmtud_notify(knet_handle_t knet_h,
void *pmtud_notify_fn_private_data,
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data;
knet_h->pmtud_notify_fn = pmtud_notify_fn;
if (knet_h->pmtud_notify_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_pmtud_get(knet_handle_t knet_h,
unsigned int *data_mtu)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!data_mtu) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*data_mtu = knet_h->data_mtu;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!knet_handle_crypto_cfg) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
crypto_fini(knet_h);
if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) ||
((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) &&
(!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) {
log_debug(knet_h, KNET_SUB_CRYPTO, "crypto is not enabled");
err = 0;
goto exit_unlock;
}
if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) {
log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short (min %u): %u",
KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) {
log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long (max %u): %u",
KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
err = crypto_init(knet_h, knet_handle_crypto_cfg);
if (err) {
err = -2;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_in;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)buff;
iov_in.iov_len = buff_len;
err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_out[1];
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(iov_out, 0, sizeof(iov_out));
iov_out[0].iov_base = (void *)buff;
iov_out[0].iov_len = buff_len;
err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
diff --git a/libknet/internals.h b/libknet/internals.h
index b5ba26fd..95db8994 100644
--- a/libknet/internals.h
+++ b/libknet/internals.h
@@ -1,201 +1,234 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#ifndef __INTERNALS_H__
#define __INTERNALS_H__
/*
* NOTE: you shouldn't need to include this header normally
*/
#include "libknet.h"
#include "onwire.h"
#define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE
#define KNET_DATABUFSIZE_CRYPT_PAD 1024
#define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD
#define KNET_RING_RCVBUFF 8388608
#define PCKT_FRAG_MAX UINT8_MAX
#define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX
+typedef void *knet_transport_link_t; /* per link transport handle */
+typedef void *knet_transport_t; /* per knet_h transport handle */
+struct knet_transport_ops; /* Forward because of circular dependancy */
+
+
struct knet_listener {
int sock;
struct sockaddr_storage address;
struct knet_listener *next;
};
struct knet_link {
/* required */
struct sockaddr_storage src_addr;
struct sockaddr_storage dst_addr;
/* configurable */
unsigned int dynamic; /* see KNET_LINK_DYN_ define above */
uint8_t priority; /* higher priority == preferred for A/P */
unsigned long long ping_interval; /* interval */
unsigned long long pong_timeout; /* timeout */
unsigned int latency_fix; /* precision */
uint8_t pong_count; /* how many ping/pong to send/receive before link is up */
/* status */
struct knet_link_status status;
/* internals */
uint8_t link_id;
+ knet_transport_link_t transport;
+ int outsock;
int listener_sock;
unsigned int configured:1; /* set to 1 if src/dst have been configured */
unsigned int remoteconnected:1; /* link is enabled for data (peer view) */
unsigned int donnotremoteupdate:1; /* define source of the update */
unsigned int host_info_up_sent:1; /* 0 if we need to notify remote that link is up */
unsigned int latency_exp;
uint8_t received_pong;
struct timespec ping_last;
/* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */
struct timespec pmtud_last;
uint32_t last_ping_size;
uint32_t last_good_mtu;
uint32_t last_bad_mtu;
uint32_t last_sent_mtu;
uint32_t last_recv_mtu;
uint8_t has_valid_mtu;
};
#define KNET_CBUFFER_SIZE 4096
struct knet_host_defrag_buf {
char buf[KNET_MAX_PACKET_SIZE];
uint8_t in_use; /* 0 buffer is free, 1 is in use */
seq_num_t pckt_seq; /* identify the pckt we are receiving */
uint8_t frag_recv; /* how many frags did we receive */
uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */
uint8_t last_first; /* special case if we receive the last fragment first */
uint16_t frag_size; /* normal frag size (not the last one) */
uint16_t last_frag_size; /* the last fragment might not be aligned with MTU size */
struct timespec last_update; /* keep time of the last pckt */
};
struct knet_host {
/* required */
uint16_t host_id;
/* configurable */
uint8_t link_handler_policy;
char name[KNET_MAX_HOST_LEN];
/* status */
struct knet_host_status status;
/* internals */
char bcast_circular_buffer[KNET_CBUFFER_SIZE];
seq_num_t bcast_seq_num_rx;
char ucast_circular_buffer[KNET_CBUFFER_SIZE];
seq_num_t ucast_seq_num_tx;
seq_num_t ucast_seq_num_rx;
/* defrag/(reassembly buffers */
struct knet_host_defrag_buf defrag_buf[KNET_MAX_LINK];
char bcast_circular_buffer_defrag[KNET_CBUFFER_SIZE];
char ucast_circular_buffer_defrag[KNET_CBUFFER_SIZE];
/* link stuff */
struct knet_link link[KNET_MAX_LINK];
uint8_t active_link_entries;
uint8_t active_links[KNET_MAX_LINK];
struct knet_host *next;
};
struct knet_sock {
int sockfd[2]; /* sockfd[0] will always be application facing
* and sockfd[1] internal if sockpair has been created by knet */
int is_socket; /* check if it's a socket for recvmmsg usage */
int is_created; /* knet created this socket and has to clean up on exit/del */
int in_use; /* set to 1 if it's use, 0 if free */
int has_error; /* set to 1 if there were errors reading from the sock
* and socket has been removed from epoll */
};
struct knet_handle {
uint16_t host_id;
unsigned int enabled:1;
struct knet_sock sockfd[KNET_DATAFD_MAX];
int logfd;
uint8_t log_levels[KNET_MAX_SUBSYSTEMS];
int hostsockfd[2];
int dstsockfd[2];
int send_to_links_epollfd;
int recv_from_links_epollfd;
int dst_link_handler_epollfd;
unsigned int pmtud_interval;
unsigned int data_mtu; /* contains the max data size that we can send onwire
* without frags */
struct knet_host *host_head;
struct knet_host *host_tail;
struct knet_host *host_index[KNET_MAX_HOST];
+ knet_transport_t transport;
+ struct knet_transport_ops *transport_ops;
uint16_t host_ids[KNET_MAX_HOST];
size_t host_ids_entries;
struct knet_listener *listener_head;
struct knet_header *recv_from_sock_buf[PCKT_FRAG_MAX];
struct knet_header *send_to_links_buf[PCKT_FRAG_MAX];
struct knet_header *recv_from_links_buf[PCKT_FRAG_MAX];
struct knet_header *pingbuf;
struct knet_header *pmtudbuf;
pthread_t send_to_links_thread;
pthread_t recv_from_links_thread;
pthread_t heartbt_thread;
pthread_t dst_link_handler_thread;
pthread_t pmtud_link_handler_thread;
int lock_init_done;
pthread_rwlock_t global_rwlock; /* global config lock */
pthread_rwlock_t listener_rwlock; /* listener add/rm lock, can switch to mutex? */
pthread_rwlock_t host_rwlock; /* send_host_info lock, can switch to mutex? */
pthread_mutex_t host_mutex; /* host mutex for cond wait on pckt send, switch to mutex/sync_send ? */
pthread_cond_t host_cond; /* conditional for above */
pthread_mutex_t pmtud_mutex; /* pmtud mutex to handle conditional send/recv + timeout */
pthread_cond_t pmtud_cond; /* conditional for above */
pthread_mutex_t tx_mutex;
struct crypto_instance *crypto_instance;
uint16_t sec_header_size;
uint16_t sec_block_size;
uint16_t sec_hash_size;
uint16_t sec_salt_size;
unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX];
unsigned char *recv_from_links_buf_crypt;
unsigned char *recv_from_links_buf_decrypt;
unsigned char *pingbuf_crypt;
unsigned char *pmtudbuf_crypt;
seq_num_t bcast_seq_num_tx;
void *dst_host_filter_fn_private_data;
int (*dst_host_filter_fn) (
void *private_data,
const unsigned char *outdata,
ssize_t outdata_len,
uint8_t tx_rx,
uint16_t this_host_id,
uint16_t src_node_id,
int8_t *channel,
uint16_t *dst_host_ids,
size_t *dst_host_ids_entries);
void *pmtud_notify_fn_private_data;
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu);
void *host_status_change_notify_fn_private_data;
void (*host_status_change_notify_fn) (
void *private_data,
uint16_t host_id,
uint8_t reachable,
uint8_t remote,
uint8_t external);
void *sock_notify_fn_private_data;
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno);
int fini_in_progress;
};
+typedef void *knet_transport_link_t; /* per link transport handle */
+typedef void *knet_transport_t; /* per knet_h transport handle */
+
+typedef struct knet_transport_ops {
+
+ int (*handle_allocate)(knet_handle_t knet_h, knet_transport_t *transport);
+ int (*handle_free)(knet_handle_t knet_h, knet_transport_t transport);
+ int (*handle_fd_eof)(knet_handle_t knet_h, int sockfd);
+ int (*handle_fd_notification)(knet_handle_t knet_h, int sockfd, struct iovec *iov, size_t iovlen);
+
+ int (*link_allocate)(knet_handle_t knet_h, knet_transport_t transport,
+ struct knet_link *link,
+ knet_transport_link_t *transport_link,
+ uint8_t link_id, struct sockaddr_storage *src_address,
+ struct sockaddr_storage *dst_address, int *listen_sock);
+ int (*link_listener_start)(knet_handle_t knet_h, knet_transport_link_t transport_link,
+ uint8_t link_id,
+ struct sockaddr_storage *address, struct sockaddr_storage *dst_address);
+ int (*link_free)(knet_transport_link_t transport_link);
+ int (*link_get_mtu_overhead)(knet_transport_link_t transport_link);
+
+ const char *transport_name;
+} knet_transport_ops_t;
+
#endif
diff --git a/libknet/libknet.h b/libknet/libknet.h
index e4499e2c..520b270d 100644
--- a/libknet/libknet.h
+++ b/libknet/libknet.h
@@ -1,1322 +1,1324 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#ifndef __LIBKNET_H__
#define __LIBKNET_H__
#include <stdint.h>
#include <netinet/in.h>
/*
* libknet limits
*/
/*
* Maximum number of hosts
*/
#define KNET_MAX_HOST 65536
/*
* Maximum number of links between 2 hosts
*/
#define KNET_MAX_LINK 8
/*
* Maximum packet size that should be written to datafd
* see knet_handle_new for details
*/
#define KNET_MAX_PACKET_SIZE 65536
/*
* Buffers used for pretty logging
* host is used to store both ip addresses and hostnames
*/
#define KNET_MAX_HOST_LEN 64
#define KNET_MAX_PORT_LEN 6
/*
* Some notifications can be generated either on TX or RX
*/
#define KNET_NOTIFY_TX 0
#define KNET_NOTIFY_RX 1
typedef struct knet_handle *knet_handle_t;
/*
* Handle structs/API calls
*/
/*
* knet_handle_new
*
* host_id - Each host in a knet is identified with a unique
* ID. when creating a new handle local host_id
* must be specified (0 to UINT16T_MAX are all valid).
* It is the user's responsibility to check that the value
* is unique, or bad things might happen.
*
* log_fd - Write file descriptor. If set to a value > 0, it will be used
* to write log packets (see below) from libknet to the application.
* Setting to 0 will disable logging from libknet.
* It is possible to enable logging at any given time (see logging API
* below).
* Make sure to either read from this filedescriptor properly and/or
* mark it O_NONBLOCK, otherwise if the fd becomes full, libknet could
* block.
*
* default_log_level -
* If logfd is specified, it will initialize all subsystems to log
* at default_log_level value. (see logging API below)
*
* on success, a new knet_handle_t is returned.
* on failure, NULL is returned and errno is set.
*/
knet_handle_t knet_handle_new(uint16_t host_id,
int log_fd,
uint8_t default_log_level);
/*
* knet_handle_free
*
* knet_h - pointer to knet_handle_t
*
* Destroy a knet handle, free all resources
*
* knet_handle_free returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_handle_free(knet_handle_t knet_h);
/*
* knet_handle_enable_sock_notify
*
* knet_h - pointer to knet_handle_t
*
* sock_notify_fn_private_data
* void pointer to data that can be used to identify
* the callback.
*
* sock_notify_fn
* A callback function that is invoked every time
* a socket in the datafd pool will report an error (-1)
* or an end of read (0) (see socket.7).
* This function MUST NEVER block or add substantial delays.
* The callback is invoked in an internal unlocked area
* to allow calls to knet_handle_add_datafd/knet_handle_remove_datafd
* to swap/replace the bad fd.
* if both err and errno are 0, it means that the socket
* has received a 0 byte packet (EOF?).
* The callback function must either remove the fd from knet
* (by calling knet_handle_remove_fd()) or dup a new fd in its place.
* Failure to do this can cause problems.
*
* knet_handle_enable_sock_notify returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_handle_enable_sock_notify(knet_handle_t knet_h,
void *sock_notify_fn_private_data,
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno)); /* sorry! can't call it errno ;) */
/*
* knet_handle_add_datafd
*
* IMPORTANT: In order to add datafd to knet, knet_handle_enable_sock_notify
* _MUST_ be set and be able to handle both errors (-1) and
* 0 bytes read / write from the provided datafd.
* On read error (< 0) from datafd, the socket is automatically
* removed from polling to avoid spinning on dead sockets.
* It is safe to call knet_handle_remove_datafd even on sockets
* that have been removed.
*
* knet_h - pointer to knet_handle_t
*
* *datafd - read/write file descriptor.
* knet will read data here to send to the other hosts
* and will write data received from the network.
* Each data packet can be of max size KNET_MAX_PACKET_SIZE!
* Applications using knet_send/knet_recv will receive a
* proper error if the packet size is not within boundaries.
* Applications using their own functions to write to the
* datafd should NOT write more than KNET_MAX_PACKET_SIZE.
*
* Please refer to handle.c on how to set up a socketpair.
*
* datafd can be 0, and knet_handle_add_datafd will create a properly
* populated socket pair the same way as ping_test, or a value
* higher than 0. A negative number will return an error.
* On exit knet_handle_free will take care to cleanup the
* socketpair only if they have been created by knet_handle_add_datafd.
*
* It is possible to pass either sockets or normal fds.
* User provided datafd will be marked as non-blocking and close-on-exit.
*
* *channel - This value has the same effect of VLAN tagging.
* A negative value will auto-allocate a channel.
* Setting a value between 0 and 31 will try to allocate that
* specific channel (unless already in use).
*
* It is possible to add up to 32 datafds but be aware that each
* one of them must have a receiving end on the other host.
*
* Example:
* hostA channel 0 will be delivered to datafd on hostB channel 0
* hostA channel 1 to hostB channel 1.
*
* Each channel must have a unique file descriptor.
*
* If your application could have 2 channels on one host and one
* channel on another host, then you can use dst_host_filter
* to manipulate channel values on TX and RX.
*
* knet_handle_add_datafd returns:
*
* 0 on success
* *datafd will be populated with a socket if the original value was 0
* or if a specific fd was set, the value is untouched.
* *channel will be populated with a channel number if the original value
* was negative or the value is untouched if a specific channel
* was requested.
*
* -1 on error and errno is set.
* *datafd and *channel are untouched or empty.
*/
#define KNET_DATAFD_MAX 32
int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel);
/*
* knet_handle_remove_datafd
*
* knet_h - pointer to knet_handle_t
*
* datafd - file descriptor to remove.
* NOTE that if the socket/fd was created by knet_handle_add_datafd,
* the socket will be closed by libknet.
*
* knet_handle_remove_datafd returns:
*
* 0 on success
*
* -1 on error and errno is set.
*/
int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd);
/*
* knet_handle_enable_sock_notify
*
* knet_h - pointer to knet_handle_t
*
* sock_notify_fn_private_data
* void pointer to data that can be used to identify
* the callback.
*
* sock_notify_fn
* A callback function that is invoked every time
* a socket in the datafd pool will report an error (-1)
* or an end of read (0) (see socket.7).
* This function MUST NEVER block or add substantial delays.
* The callback is invoked in an internal unlocked area
* to allow calls to knet_handle_add_datafd/knet_handle_remove_datafd
* to swap/replace the bad fd.
* if both err and errno are 0, it means that the socket
* has received a 0 byte packet (EOF?).
* The callback function must either remove the fd from knet
* (by calling knet_handle_remove_fd()) or dup a new fd in its place.
* Failure to do this can cause problems.
*
* knet_handle_enable_sock_notify returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel);
/*
* knet_handle_get_datafd
*
* knet_h - pointer to knet_handle_t
*
* channel - get the datafd associated to this channel
*
* *datafd - will contain the result
*
* knet_handle_get_datafd returns:
*
* 0 on success
* and *datafd will contain the results
*
* -1 on error and errno is set.
* and *datafd content is meaningless
*/
int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd);
/*
* knet_recv
*
* knet_h - pointer to knet_handle_t
*
* buff - pointer to buffer to store the received data
*
* buff_len - buffer lenght
*
* knet_recv is a commodity function to wrap iovec operations
* around a socket. It returns a call to readv(2).
*/
ssize_t knet_recv(knet_handle_t knet_h,
char *buff,
const size_t buff_len,
const int8_t channel);
/*
* knet_send
*
* knet_h - pointer to knet_handle_t
*
* buff - pointer to the buffer of data to send
*
* buff_len - length of data to send
*
* knet_send is a commodity function to wrap iovec operations
* around a socket. It returns a call to writev(2).
*/
ssize_t knet_send(knet_handle_t knet_h,
const char *buff,
const size_t buff_len,
const int8_t channel);
/*
* knet_send_sync
*
* knet_h - pointer to knet_handle_t
*
* buff - pointer to the buffer of data to send
*
* buff_len - length of data to send
*
* channel - data channel to use (see knet_handle_add_datafd)
*
* All knet RX/TX operations are async for performance reasons.
* There are applications that might need a sync version of data
* transmission and receive errors in case of failure to deliver
* to another host.
* knet_send_sync bypasses the whole TX async layer and delivers
* data directly to the link layer, and returns errors accordingly.
* knet_send_sync allows to send only one packet to one host at
* a time. It does NOT support multiple destinations or multicast
* packets. Decision is still based on dst_host_filter_fn.
*
* knet_send_sync returns 0 on success and -1 on error.
*
* In addition to normal sendmmsg errors, knet_send_sync can fail
* due to:
*
* ECANCELED - data forward is disabled
* EFAULT - dst_host_filter fatal error
* EINVAL - dst_host_filter did not provide
* dst_host_ids_entries on unicast pckts
* E2BIG - dst_host_filter did return more than one
* dst_host_ids_entries on unicast pckts
* ENOMSG - received unknown message type
* EHOSTDOWN - unicast pckt cannot be delivered because
* dest host is not connected yet
* ECHILD - crypto failed
* EAGAIN - sendmmsg was unable to send all messages and
* there was no progress during retry
*/
int knet_send_sync(knet_handle_t knet_h,
const char *buff,
const size_t buff_len,
const int8_t channel);
/*
* knet_handle_enable_filter
*
* knet_h - pointer to knet_handle_t
*
* dst_host_filter_fn_private_data
* void pointer to data that can be used to identify
* the callback.
*
* dst_host_filter_fn -
* is a callback function that is invoked every time
* a packet hits datafd (see knet_handle_new).
* the function allows users to tell libknet where the
* packet has to be delivered.
*
* const unsigned char *outdata - is a pointer to the
* current packet
* ssize_t outdata_len - lenght of the above data
* uint8_t tx_rx - filter is called on tx or rx
* (see defines below)
* uint16_t this_host_id - host_id processing the packet
* uint16_t src_host_id - host_id that generated the
* packet
* uint16_t *dst_host_ids - array of KNET_MAX_HOST uint16_t
* where to store the destinations
* size_t *dst_host_ids_entries - number of hosts to send the message
*
* dst_host_filter_fn should return
* -1 on error, packet is discarded.
* 0 packet is unicast and should be sent to dst_host_ids and there are
* dst_host_ids_entries in the buffer.
* 1 packet is broadcast/multicast and is sent all hosts.
* contents of dst_host_ids and dst_host_ids_entries are ignored.
* (see also kronosnetd/etherfilter.* for an example that filters based
* on ether protocol)
*
* knet_handle_enable_filter returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_handle_enable_filter(knet_handle_t knet_h,
void *dst_host_filter_fn_private_data,
int (*dst_host_filter_fn) (
void *private_data,
const unsigned char *outdata,
ssize_t outdata_len,
uint8_t tx_rx,
uint16_t this_host_id,
uint16_t src_host_id,
int8_t *channel,
uint16_t *dst_host_ids,
size_t *dst_host_ids_entries));
/*
* knet_handle_setfwd
*
* knet_h - pointer to knet_handle_t
*
* enable - set to 1 to allow data forwarding, 0 to disable data forwarding.
*
* knet_handle_setfwd returns:
*
* 0 on success
* -1 on error and errno is set.
*
* By default data forwarding is off and no traffic will pass through knet until
* it is set on.
*/
int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled);
/*
* knet_handle_pmtud_setfreq
*
* knet_h - pointer to knet_handle_t
*
* interval - define the interval in seconds between PMTUd scans
* range from 1 to 86400 (24h)
*
* knet_handle_pmtud_setfreq returns:
*
* 0 on success
* -1 on error and errno is set.
*
* default interval is 60.
*/
#define KNET_PMTUD_DEFAULT_INTERVAL 60
int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval);
/*
* knet_handle_pmtud_getfreq
*
* knet_h - pointer to knet_handle_t
*
* interval - pointer where to store the current interval value
*
* knet_handle_pmtud_setfreq returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval);
/*
* knet_handle_enable_pmtud_notify
*
* knet_h - pointer to knet_handle_t
*
* pmtud_notify_fn_private_data
* void pointer to data that can be used to identify
* the callback.
*
* pmtud_notify_fn
* is a callback function that is invoked every time
* a path MTU size change is detected.
* The function allows libknet to notify the user
* of data MTU, that's the max value that can be send
* onwire without fragmentation. The data MTU will always
* be lower than real link MTU because it accounts for
* protocol overhead, knet packet header and (if configured)
* crypto overhead,
* This function MUST NEVER block or add substantial delays.
*
* knet_handle_enable_pmtud_notify returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_handle_enable_pmtud_notify(knet_handle_t knet_h,
void *pmtud_notify_fn_private_data,
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu));
/*
* knet_handle_pmtud_get
*
* knet_h - pointer to knet_handle_t
*
* data_mtu - pointer where to store data_mtu (see above)
*
* knet_handle_pmtud_get returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_handle_pmtud_get(knet_handle_t knet_h,
unsigned int *data_mtu);
/*
* knet_handle_crypto
*
* knet_h - pointer to knet_handle_t
*
* knet_handle_crypto_cfg -
* pointer to a knet_handle_crypto_cfg structure
*
* crypto_model should contain the model name.
* Currently only "nss" is supported.
* Setting to "none" will disable crypto.
*
* crypto_cipher_type
* should contain the cipher algo name.
* It can be set to "none" to disable
* encryption.
* Currently supported by "nss" model:
* "3des", "aes128", "aes192" and "aes256".
*
* crypto_hash_type
* should contain the hashing algo name.
* It can be set to "none" to disable
* hashing.
* Currently supported by "nss" model:
* "md5", "sha1", "sha256", "sha384" and "sha512".
*
* private_key will contain the private shared key.
* It has to be at least KNET_MIN_KEY_LEN long.
*
* private_key_len
* length of the provided private_key.
*
* Implementation notes/current limitations:
* - enabling crypto, will increase latency as packets have
* to processed.
* - enabling crypto might reduce the overall throughtput
* due to crypto data overhead.
* - re-keying is not implemented yet.
* - private/public key encryption/hashing is not currently
* planned.
* - crypto key must be the same for all hosts in the same
* knet instance.
* - it is safe to call knet_handle_crypto multiple times at runtime.
* The last config will be used.
* IMPORTANT: a call to knet_handle_crypto can fail due to:
* 1) failure to obtain locking
* 2) errors to initializing the crypto level.
* This can happen even in subsequent calls to knet_handle_crypto.
* A failure in crypto init, might leave your traffic unencrypted!
* It's best to stop data forwarding (see above), change crypto config,
* start forward again.
*
* knet_handle_crypto returns:
*
* 0 on success
* -1 on error and errno is set.
* -2 on crypto subsystem initialization error. No errno is provided at the moment (yet).
*/
#define KNET_MIN_KEY_LEN 1024
#define KNET_MAX_KEY_LEN 4096
struct knet_handle_crypto_cfg {
char crypto_model[16];
char crypto_cipher_type[16];
char crypto_hash_type[16];
unsigned char private_key[KNET_MAX_KEY_LEN];
unsigned int private_key_len;
};
int knet_handle_crypto(knet_handle_t knet_h,
struct knet_handle_crypto_cfg *knet_handle_crypto_cfg);
/*
* host structs/API calls
*/
/*
* knet_host_add
*
* knet_h - pointer to knet_handle_t
*
* host_id - each host in a knet is identified with a unique ID
* (see also knet_handle_new documentation above)
*
* knet_host_add returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_host_add(knet_handle_t knet_h, uint16_t host_id);
/*
* knet_host_remove
*
* knet_h - pointer to knet_handle_t
*
* host_id - each host in a knet is identified with a unique ID
* (see also knet_handle_new documentation above)
*
* knet_host_remove returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_host_remove(knet_handle_t knet_h, uint16_t host_id);
/*
* knet_host_set_name
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* name - this name will be used for pretty logging and eventually
* search for hosts (see also get_name and get_id below).
* Only up to KNET_MAX_HOST_LEN - 1 bytes will be accepted and
* name has to be unique for each host.
*
* knet_host_set_name returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_host_set_name(knet_handle_t knet_h, uint16_t host_id,
const char *name);
/*
* knet_host_get_name_by_host_id
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* name - pointer to a preallocated buffer of at least size KNET_MAX_HOST_LEN
* where the current host name will be stored
* (as set by knet_host_set_name or default by knet_host_add)
*
* knet_host_get_name_by_host_id returns:
*
* 0 on success
* -1 on error and errno is set (name is left untouched)
*/
int knet_host_get_name_by_host_id(knet_handle_t knet_h, uint16_t host_id,
char *name);
/*
* knet_host_get_id_by_host_name
*
* knet_h - pointer to knet_handle_t
*
* name - name to lookup, max len KNET_MAX_HOST_LEN
*
* host_id - where to store the result
*
* knet_host_get_id_by_host_name returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_host_get_id_by_host_name(knet_handle_t knet_h, const char *name,
uint16_t *host_id);
/*
* knet_host_get_host_list
*
* knet_h - pointer to knet_handle_t
*
* host_ids - array of at lest KNET_MAX_HOST size
*
* host_ids_entries -
* number of entries writted in host_ids
*
* knet_host_get_host_list returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_host_get_host_list(knet_handle_t knet_h,
uint16_t *host_ids, size_t *host_ids_entries);
/*
* define switching policies
*/
#define KNET_LINK_POLICY_PASSIVE 0
#define KNET_LINK_POLICY_ACTIVE 1
#define KNET_LINK_POLICY_RR 2
/*
* knet_host_set_policy
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* policy - there are currently 3 kind of simple switching policies
* as defined above, based on link configuration.
* KNET_LINK_POLICY_PASSIVE - the active link with the lowest
* priority will be used.
* if one or more active links share
* the same priority, the one with
* lowest link_id will be used.
*
* KNET_LINK_POLICY_ACTIVE - all active links will be used
* simultaneously to send traffic.
* link priority is ignored.
*
* KNET_LINK_POLICY_RR - round-robin policy, every packet
* will be send on a different active
* link.
*
* knet_host_set_policy returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_host_set_policy(knet_handle_t knet_h, uint16_t host_id,
uint8_t policy);
/*
* knet_host_get_policy
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* policy - will contain the current configured switching policy.
* Default is passive when creating a new host.
*
* knet_host_get_policy returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_host_get_policy(knet_handle_t knet_h, uint16_t host_id,
uint8_t *policy);
/*
* knet_host_enable_status_change_notify
*
* knet_h - pointer to knet_handle_t
*
* host_status_change_notify_fn_private_data
* void pointer to data that can be used to identify
* the callback.
*
* host_status_change_notify_fn
* is a callback function that is invoked every time
* there is a change in the host status.
* host status is identified by:
* - reachable, this host can send/receive data to/from host_id
* - remote, 0 if the host_id is connected locally or 1 if
* the there is one or more knet host(s) in between.
* NOTE: re-switching is NOT currently implemented,
* but this is ready for future and can avoid
* an API/ABI breakage later on.
* - external, 0 if the host_id is configured locally or 1 if
* it has been added from remote nodes config.
* NOTE: dynamic topology is NOT currently implemented,
* but this is ready for future and can avoid
* an API/ABI breakage later on.
* This function MUST NEVER block or add substantial delays.
*
* knet_host_status_change_notify returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_host_enable_status_change_notify(knet_handle_t knet_h,
void *host_status_change_notify_fn_private_data,
void (*host_status_change_notify_fn) (
void *private_data,
uint16_t host_id,
uint8_t reachable,
uint8_t remote,
uint8_t external));
/*
* define host status structure for quick lookup
* struct is in flux as more stats will be added soon
*
* reachable host_id can be seen either directly connected
* or via another host_id
*
* remote 0 = node is connected locally, 1 is visible via
* via another host_id
*
* external 0 = node is configured/known locally,
* 1 host_id has been received via another host_id
*/
struct knet_host_status {
uint8_t reachable;
uint8_t remote;
uint8_t external;
/* add host statistics */
};
/*
* knet_host_status_get
*
* knet_h - pointer to knet_handle_t
*
* status - pointer to knet_host_status struct (see above)
*
* knet_handle_pmtud_get returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_host_get_status(knet_handle_t knet_h, uint16_t host_id,
struct knet_host_status *status);
/*
* link structs/API calls
*
* every host allocated/managed by knet_host_* has
* KNET_MAX_LINK structures to define the network
* paths that connect 2 hosts.
*
* Each link is identified by a link_id that has a
* values between 0 and KNET_MAX_LINK - 1.
*
* KNOWN LIMITATIONS:
*
* - let's assume the scenario where two hosts are connected
* with any number of links. link_id must match on both sides.
* If host_id 0 link_id 0 is configured to connect IP1 to IP2 and
* host_id 0 link_id 1 is configured to connect IP3 to IP4,
* host_id 1 link_id 0 _must_ connect IP2 to IP1 and likewise
* host_id 1 link_id 1 _must_ connect IP4 to IP3.
* We might be able to lift this restriction in future, by using
* other data to determine src/dst link_id, but for now, deal with it.
*
* -
*/
/*
* knet_link_set_config
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* link_id - see above
*
* src_addr - sockaddr_storage that can be either IPv4 or IPv6
*
* dst_addr - sockaddr_storage that can be either IPv4 or IPv6
* this can be null if we don't know the incoming
* IP address/port and the link will remain quiet
* till the node on the other end will initiate a
* connection
*
* knet_link_set_config returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_link_set_config(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr);
/*
* knet_link_get_config
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* link_id - see above
*
* src_addr - sockaddr_storage that can be either IPv4 or IPv6
*
* dst_addr - sockaddr_storage that can be either IPv4 or IPv6
*
* dynamic - 0 if dst_addr is static or 1 if dst_addr is dynamic.
* In case of 1, dst_addr can be NULL and it will be left
* untouched.
*
* knet_link_set_config returns:
*
* 0 on success.
* -1 on error and errno is set.
*/
int knet_link_get_config(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr,
uint8_t *dynamic);
/*
* knet_link_set_enable
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* link_id - see above
*
* enabled - 0 disable the link, 1 enable the link
*
* knet_link_set_enable returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_link_set_enable(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
unsigned int enabled);
/*
* knet_link_get_enable
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* link_id - see above
*
* enabled - 0 disable the link, 1 enable the link
*
* knet_link_get_enable returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_link_get_enable(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
unsigned int *enabled);
/*
* knet_link_set_ping_timers
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* link_id - see above
*
* interval - specify the ping interval
*
* timeout - if no pong is received within this time,
* the link is declared dead
*
* precision - how many values of latency are used to calculate
* the average link latency (see also get_status below)
*
* knet_link_set_ping_timers returns:
*
* 0 on success
* -1 on error and errno is set.
*/
#define KNET_LINK_DEFAULT_PING_INTERVAL 1000 /* 1 second */
#define KNET_LINK_DEFAULT_PING_TIMEOUT 2000 /* 2 seconds */
#define KNET_LINK_DEFAULT_PING_PRECISION 2048 /* samples */
int knet_link_set_ping_timers(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
time_t interval, time_t timeout, unsigned int precision);
/*
* knet_link_get_ping_timers
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* link_id - see above
*
* interval - ping intervall
*
* timeout - if no pong is received within this time,
* the link is declared dead
*
* precision - how many values of latency are used to calculate
* the average link latency (see also get_status below)
*
* knet_link_get_ping_timers returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_link_get_ping_timers(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
time_t *interval, time_t *timeout, unsigned int *precision);
/*
* knet_link_set_pong_count
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* link_id - see above
*
* pong_count - how many valid ping/pongs before a link is marked UP.
* default: 5, value should be > 0
*
* knet_link_set_pong_count returns:
*
* 0 on success
* -1 on error and errno is set.
*/
#define KNET_LINK_DEFAULT_PONG_COUNT 5
int knet_link_set_pong_count(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
uint8_t pong_count);
/*
* knet_link_get_pong_count
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* link_id - see above
*
* pong_count - see above
*
* knet_link_get_pong_count returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_link_get_pong_count(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
uint8_t *pong_count);
/*
* knet_link_set_priority
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* link_id - see above
*
* priority - specify the switching priority for this link
* see also knet_host_set_policy
*
* knet_link_set_priority returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_link_set_priority(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
uint8_t priority);
/*
* knet_link_get_priority
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* link_id - see above
*
* priority - gather the switching priority for this link
* see also knet_host_set_policy
*
* knet_link_get_priority returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_link_get_priority(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
uint8_t *priority);
/*
* knet_link_get_link_list
*
* knet_h - pointer to knet_handle_t
*
* link_ids - array of at lest KNET_MAX_LINK size
* with the list of configured links for a certain host.
*
* link_ids_entries -
* number of entries contained in link_ids
*
* knet_link_get_link_list returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_link_get_link_list(knet_handle_t knet_h, uint16_t host_id,
uint8_t *link_ids, size_t *link_ids_entries);
/*
* define link status structure for quick lookup
* struct is in flux as more stats will be added soon
*
* src/dst_{ipaddr,port} strings are filled by
* getnameinfo(3) when configuring the link.
* if the link is dynamic (see knet_link_set_config)
* dst_ipaddr/port will contain ipaddr/port of the currently
* connected peer or "Unknown" if it was not possible
* to determine the ipaddr/port at runtime.
*
* enabled see also knet_link_set/get_enable.
*
* connected the link is connected to a peer and ping/pong traffic
* is flowing.
*
* dynconnected the link has dynamic ip on the other end, and
* we can see the other host is sending pings to us.
*
* latency average latency of this link
* see also knet_link_set/get_timeout.
*
* pong_last if the link is down, this value tells us how long
* ago this link was active. A value of 0 means that the link
* has never been active.
*/
struct knet_link_status {
char src_ipaddr[KNET_MAX_HOST_LEN];
char src_port[KNET_MAX_PORT_LEN];
char dst_ipaddr[KNET_MAX_HOST_LEN];
char dst_port[KNET_MAX_PORT_LEN];
unsigned int enabled:1; /* link is configured and admin enabled for traffic */
unsigned int connected:1; /* link is connected for data (local view) */
unsigned int dynconnected:1; /* link has been activated by remote dynip */
unsigned long long latency; /* average latency computed by fix/exp */
struct timespec pong_last;
unsigned int mtu; /* current detected MTU on this link */
unsigned int proto_overhead; /* contains the size of the IP protocol, knet headers and
* crypto headers (if configured). This value is filled in
* ONLY after the first PMTUd run on that given link,
* and can change if link configuration or crypto configuration
* changes at runtime.
* WARNING: in general mtu + proto_overhead might or might
* not match the output of ifconfig mtu due to crypto
* requirements to pad packets to some specific boundaries. */
/* add link statistics */
};
/*
* knet_link_get_status
*
* knet_h - pointer to knet_handle_t
*
* host_id - see above
*
* link_id - see above
*
* status - pointer to knet_link_status struct (see above)
*
* knet_link_get_status returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_link_get_status(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
struct knet_link_status *status);
/*
* logging structs/API calls
*/
/*
* libknet is composed of several subsystems. In order
* to easily distinguish log messages coming from different
* places, each subsystem has its own ID.
*/
-#define KNET_SUB_COMMON 0 /* common.c */
-#define KNET_SUB_HANDLE 1 /* handle.c alloc/dealloc config changes */
-#define KNET_SUB_HOST 2 /* host add/del/modify */
-#define KNET_SUB_LISTENER 3 /* listeners add/del/modify... */
-#define KNET_SUB_LINK 4 /* link add/del/modify */
-#define KNET_SUB_PMTUD 5 /* Path MTU Discovery */
-#define KNET_SUB_SEND_T 6 /* send to link thread */
-#define KNET_SUB_LINK_T 7 /* recv from link thread */
-#define KNET_SUB_SWITCH_T 8 /* switching thread */
-#define KNET_SUB_HB_T 9 /* heartbeat thread */
-#define KNET_SUB_PMTUD_T 10 /* Path MTU Discovery thread */
-#define KNET_SUB_FILTER 11 /* (ether)filter errors */
-#define KNET_SUB_CRYPTO 12 /* crypto.c generic layer */
-#define KNET_SUB_NSSCRYPTO 13 /* nsscrypto.c */
-#define KNET_SUB_LAST KNET_SUB_NSSCRYPTO
-#define KNET_MAX_SUBSYSTEMS KNET_SUB_LAST + 1
+#define KNET_SUB_COMMON 0 /* common.c */
+#define KNET_SUB_HANDLE 1 /* handle.c alloc/dealloc config changes */
+#define KNET_SUB_HOST 2 /* host add/del/modify */
+#define KNET_SUB_LISTENER 3 /* listeners add/del/modify... */
+#define KNET_SUB_LINK 4 /* link add/del/modify */
+#define KNET_SUB_PMTUD 5 /* Path MTU Discovery */
+#define KNET_SUB_SEND_T 6 /* send to link thread */
+#define KNET_SUB_LINK_T 7 /* recv from link thread */
+#define KNET_SUB_SWITCH_T 8 /* switching thread */
+#define KNET_SUB_HB_T 9 /* heartbeat thread */
+#define KNET_SUB_PMTUD_T 10 /* Path MTU Discovery thread */
+#define KNET_SUB_TRANSPORT_T 11 /* Transport common */
+#define KNET_SUB_UDP_LINK_T 12 /* UDP Transport */
+#define KNET_SUB_FILTER 15 /* (ether)filter errors */
+#define KNET_SUB_CRYPTO 16 /* crypto.c generic layer */
+#define KNET_SUB_NSSCRYPTO 17 /* nsscrypto.c */
+#define KNET_SUB_LAST KNET_SUB_NSSCRYPTO
+#define KNET_MAX_SUBSYSTEMS KNET_SUB_LAST + 1
/*
* Convert between subsystem IDs and names
*/
/*
* knet_log_get_subsystem_name
*
* return internal name of the subsystem or "common"
*/
const char *knet_log_get_subsystem_name(uint8_t subsystem);
/*
* knet_log_get_subsystem_id
*
* return internal ID of the subsystem or KNET_SUB_COMMON
*/
uint8_t knet_log_get_subsystem_id(const char *name);
/*
* 4 log levels are enough for everybody
*/
#define KNET_LOG_ERR 0 /* unrecoverable errors/conditions */
#define KNET_LOG_WARN 1 /* recoverable errors/conditions */
#define KNET_LOG_INFO 2 /* info, link up/down, config changes.. */
#define KNET_LOG_DEBUG 3
/*
* Convert between log level values and names
*/
/*
* knet_log_get_loglevel_name
*
* return internal name of the log level or "ERROR" for unknown values
*/
const char *knet_log_get_loglevel_name(uint8_t level);
/*
* knet_log_get_loglevel_id
*
* return internal log level ID or KNET_LOG_ERR for invalid names
*/
uint8_t knet_log_get_loglevel_id(const char *name);
/*
* every log message is composed by a text message (including a trailing \n)
* and message level/subsystem IDs.
* In order to make debugging easier it is possible to send those packets
* straight to stdout/stderr (see ping_test.c stdout option).
*/
#define KNET_MAX_LOG_MSG_SIZE 256
struct knet_log_msg {
char msg[KNET_MAX_LOG_MSG_SIZE - (sizeof(uint8_t)*2)];
uint8_t subsystem; /* KNET_SUB_* */
uint8_t msglevel; /* KNET_LOG_* */
};
/*
* knet_log_set_log_level
*
* knet_h - same as above
*
* subsystem - same as above
*
* level - same as above
*
* knet_log_set_loglevel allows fine control of log levels by subsystem.
* See also knet_handle_new for defaults.
*
* knet_log_set_loglevel returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_log_set_loglevel(knet_handle_t knet_h, uint8_t subsystem,
uint8_t level);
/*
* knet_log_get_log_level
*
* knet_h - same as above
*
* subsystem - same as above
*
* level - same as above
*
* knet_log_get_loglevel returns:
*
* 0 on success
* -1 on error and errno is set.
*/
int knet_log_get_loglevel(knet_handle_t knet_h, uint8_t subsystem,
uint8_t *level);
#endif
diff --git a/libknet/link.c b/libknet/link.c
index b86e17ca..9f70d2a2 100644
--- a/libknet/link.c
+++ b/libknet/link.c
@@ -1,924 +1,952 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <netdb.h>
#include <string.h>
#include <stdio.h>
#include <pthread.h>
#include "internals.h"
#include "logging.h"
#include "link.h"
#include "listener.h"
+#include "transports.h"
#include "host.h"
int _link_updown(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
unsigned int enabled, unsigned int connected)
{
struct knet_link *link = &knet_h->host_index[host_id]->link[link_id];
if ((link->status.enabled == enabled) &&
(link->status.connected == connected))
return 0;
link->status.enabled = enabled;
link->status.connected = connected;
_host_dstcache_update_sync(knet_h, knet_h->host_index[host_id]);
if ((link->status.dynconnected) &&
(!link->status.connected))
link->status.dynconnected = 0;
return 0;
}
int knet_link_set_config(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!src_addr) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (link->status.enabled != 0) {
err =-1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
memmove(&link->src_addr, src_addr, sizeof(struct sockaddr_storage));
err = getnameinfo((const struct sockaddr *)src_addr, sizeof(struct sockaddr_storage),
link->status.src_ipaddr, KNET_MAX_HOST_LEN,
link->status.src_port, KNET_MAX_PORT_LEN,
NI_NUMERICHOST | NI_NUMERICSERV);
if (err) {
if (err == EAI_SYSTEM) {
savederrno = errno;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u source addr/port: %s",
host_id, link_id, strerror(savederrno));
} else {
savederrno = EINVAL;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u source addr/port: %s",
host_id, link_id, gai_strerror(err));
}
err = -1;
goto exit_unlock;
}
if (!dst_addr) {
link->dynamic = KNET_LINK_DYNIP;
err = 0;
goto exit_unlock;
}
link->dynamic = KNET_LINK_STATIC;
memmove(&link->dst_addr, dst_addr, sizeof(struct sockaddr_storage));
err = getnameinfo((const struct sockaddr *)dst_addr, sizeof(struct sockaddr_storage),
link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
link->status.dst_port, KNET_MAX_PORT_LEN,
NI_NUMERICHOST | NI_NUMERICSERV);
if (err) {
if (err == EAI_SYSTEM) {
savederrno = errno;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u destination addr/port: %s",
host_id, link_id, strerror(savederrno));
} else {
savederrno = EINVAL;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u destination addr/port: %s",
host_id, link_id, gai_strerror(err));
}
err = -1;
}
+
+ knet_h->transport_ops = get_udp_transport();
+
+ /* First time we've used this transport for this handle */
+ if (!knet_h->transport) {
+ knet_h->transport_ops->handle_allocate(knet_h, &knet_h->transport);
+ }
+ if (!knet_h->transport) {
+ savederrno = errno;
+ log_err(knet_h, KNET_SUB_LISTENER, "Failed to allocate transport handle for %s: %s",
+ knet_h->transport_ops->transport_name,
+ strerror(savederrno));
+ err = -1;
+ goto exit_unlock;
+ }
+
+
exit_unlock:
if (!err) {
link->configured = 1;
link->pong_count = KNET_LINK_DEFAULT_PONG_COUNT;
link->has_valid_mtu = 0;
link->ping_interval = KNET_LINK_DEFAULT_PING_INTERVAL * 1000; /* microseconds */
link->pong_timeout = KNET_LINK_DEFAULT_PING_TIMEOUT * 1000; /* microseconds */
link->latency_fix = KNET_LINK_DEFAULT_PING_PRECISION;
link->latency_exp = KNET_LINK_DEFAULT_PING_PRECISION - \
((link->ping_interval * KNET_LINK_DEFAULT_PING_PRECISION) / 8000000);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_config(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr,
uint8_t *dynamic)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!src_addr) {
errno = EINVAL;
return -1;
}
if (!dynamic) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if ((link->dynamic == KNET_LINK_STATIC) && (!dst_addr)) {
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
memmove(src_addr, &link->src_addr, sizeof(struct sockaddr_storage));
if (link->dynamic == KNET_LINK_STATIC) {
*dynamic = 0;
memmove(dst_addr, &link->dst_addr, sizeof(struct sockaddr_storage));
} else {
*dynamic = 1;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_set_enable(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
unsigned int enabled)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (enabled > 1) {
errno = EINVAL;
return -1;
}
/*
* this read lock might appear as an API violation, but be
* very careful because we cannot use a write lock (yet).
* the _send_host_info requires threads to be operational.
* a write lock here would deadlock.
* a read lock is sufficient as all functions invoked by
* this code are already thread safe.
*/
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled == enabled) {
err = 0;
goto exit_unlock;
}
if (enabled) {
+ if (knet_h->transport_ops->link_allocate(
+ knet_h, knet_h->transport,
+ link,
+ &link->transport, link_id,
+ &link->src_addr, &link->dst_addr,
+ &link->outsock) < 0) {
+ err = -1;
+ goto exit_unlock;
+ }
+
if (_listener_add(knet_h, host_id, link_id) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_LINK, "Unable to setup listener for this link");
goto exit_unlock;
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is enabled",
host_id, link_id);
}
if (!enabled) {
struct knet_hostinfo knet_hostinfo;
knet_hostinfo.khi_type = KNET_HOSTINFO_TYPE_LINK_UP_DOWN;
knet_hostinfo.khi_bcast = KNET_HOSTINFO_UCAST;
knet_hostinfo.khi_dst_node_id = host_id;
knet_hostinfo.khip_link_status_link_id = link_id;
knet_hostinfo.khip_link_status_status = KNET_HOSTINFO_LINK_STATUS_DOWN;
_send_host_info(knet_h, &knet_hostinfo, KNET_HOSTINFO_LINK_STATUS_SIZE);
}
err = _link_updown(knet_h, host_id, link_id, enabled, link->status.connected);
savederrno = errno;
if ((!err) && (enabled)) {
err = 0;
goto exit_unlock;
}
if (err) {
err = -1;
goto exit_unlock;
}
err = _listener_remove(knet_h, host_id, link_id);
savederrno = errno;
if ((err) && (savederrno != EBUSY)) {
log_err(knet_h, KNET_SUB_LINK, "Unable to remove listener for this link");
if (_link_updown(knet_h, host_id, link_id, 1, link->status.connected)) {
/* force link status the hard way */
link->status.enabled = 1;
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is NOT disabled",
host_id, link_id);
err = -1;
goto exit_unlock;
} else {
err = 0;
savederrno = 0;
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is disabled",
host_id, link_id);
link->host_info_up_sent = 0;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_enable(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
unsigned int *enabled)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!enabled) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*enabled = link->status.enabled;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_set_pong_count(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
uint8_t pong_count)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (pong_count < 1) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
link->pong_count = pong_count;
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u pong count update: %u",
host_id, link_id, link->pong_count);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_pong_count(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
uint8_t *pong_count)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!pong_count) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*pong_count = link->pong_count;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_set_ping_timers(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
time_t interval, time_t timeout, unsigned int precision)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
if (!timeout) {
errno = EINVAL;
return -1;
}
if (!precision) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
link->ping_interval = interval * 1000; /* microseconds */
link->pong_timeout = timeout * 1000; /* microseconds */
link->latency_fix = precision;
link->latency_exp = precision - \
((link->ping_interval * precision) / 8000000);
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u timeout update - interval: %llu timeout: %llu precision: %d",
host_id, link_id, link->ping_interval, link->pong_timeout, precision);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_ping_timers(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
time_t *interval, time_t *timeout, unsigned int *precision)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
if (!timeout) {
errno = EINVAL;
return -1;
}
if (!precision) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*interval = link->ping_interval / 1000; /* microseconds */
*timeout = link->pong_timeout / 1000;
*precision = link->latency_fix;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_set_priority(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
uint8_t priority)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
uint8_t old_priority;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
old_priority = link->priority;
if (link->priority == priority) {
err = 0;
goto exit_unlock;
}
link->priority = priority;
if (_host_dstcache_update_async(knet_h, host)) {
savederrno = errno;
log_debug(knet_h, KNET_SUB_LINK,
"Unable to update link priority (host: %u link: %u priority: %u): %s",
host_id, link_id, link->priority, strerror(savederrno));
link->priority = old_priority;
err = -1;
goto exit_unlock;
}
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u priority set to: %u",
host_id, link_id, link->priority);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_priority(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
uint8_t *priority)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!priority) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*priority = link->priority;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_link_list(knet_handle_t knet_h, uint16_t host_id,
uint8_t *link_ids, size_t *link_ids_entries)
{
int savederrno = 0, err = 0, i, count = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!link_ids) {
errno = EINVAL;
return -1;
}
if (!link_ids_entries) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
for (i = 0; i < KNET_MAX_LINK; i++) {
link = &host->link[i];
if (!link->configured) {
continue;
}
link_ids[count] = i;
count++;
}
*link_ids_entries = count;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_status(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id,
struct knet_link_status *status)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!status) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
memmove(status, &link->status, sizeof(struct knet_link_status));
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
diff --git a/libknet/listener.c b/libknet/listener.c
index 4278f779..7ed0bfe0 100644
--- a/libknet/listener.c
+++ b/libknet/listener.c
@@ -1,283 +1,185 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <string.h>
#include <stdlib.h>
#include "internals.h"
#include "common.h"
#include "logging.h"
#include "listener.h"
+#include "transports.h"
int _listener_add(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id)
{
- int value, count = 0;
- struct epoll_event ev;
+ int count = 0;
int savederrno = 0, err = 0;
struct knet_link *lnk = &knet_h->host_index[host_id]->link[link_id];
struct knet_listener *listener = NULL;
savederrno = pthread_rwlock_wrlock(&knet_h->listener_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LISTENER, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
listener = knet_h->listener_head;
while (listener) {
count++;
log_debug(knet_h, KNET_SUB_LISTENER, "checking listener: %d", count);
if (!memcmp(&lnk->src_addr, &listener->address, sizeof(struct sockaddr_storage))) {
log_debug(knet_h, KNET_SUB_LISTENER, "found active listener");
break;
}
listener = listener->next;
}
if (!listener) {
listener = malloc(sizeof(struct knet_listener));
if (!listener) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_LISTENER, "out of memory to allocate listener: %s",
strerror(savederrno));
goto exit_unlock;
}
memset(listener, 0, sizeof(struct knet_listener));
memmove(&listener->address, &lnk->src_addr, sizeof(struct sockaddr_storage));
-
- listener->sock = socket(listener->address.ss_family, SOCK_DGRAM, 0);
- if (listener->sock < 0) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_LISTENER, "Unable to create listener socket: %s",
- strerror(savederrno));
- goto exit_unlock;
- }
-
- value = KNET_RING_RCVBUFF;
- if (setsockopt(listener->sock, SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value)) < 0) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_LISTENER, "Unable to set listener receive buffer: %s",
- strerror(savederrno));
- goto exit_unlock;
- }
-
- value = KNET_RING_RCVBUFF;
- if (setsockopt(listener->sock, SOL_SOCKET, SO_SNDBUFFORCE, &value, sizeof(value)) < 0) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_LISTENER, "Unable to set listener send buffer: %s",
- strerror(savederrno));
- goto exit_unlock;
- }
-
- value = 1;
- if (setsockopt(listener->sock, SOL_IP, IP_FREEBIND, &value, sizeof(value)) <0) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_LISTENER, "Unable to set FREEBIND on listener socket: %s",
- strerror(savederrno));
- goto exit_unlock;
- }
-
- if (listener->address.ss_family == AF_INET6) {
- value = 1;
- if (setsockopt(listener->sock, IPPROTO_IPV6, IPV6_V6ONLY,
- &value, sizeof(value)) < 0) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_LISTENER, "Unable to set listener IPv6 only: %s",
- strerror(savederrno));
- goto exit_unlock;
-
- }
- value = IPV6_PMTUDISC_PROBE;
- if (setsockopt(listener->sock, SOL_IPV6, IPV6_MTU_DISCOVER, &value, sizeof(value)) <0) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_LISTENER, "Unable to set PMTUDISC on listener socket: %s",
- strerror(savederrno));
- goto exit_unlock;
- }
- } else {
- value = IP_PMTUDISC_PROBE;
- if (setsockopt(listener->sock, SOL_IP, IP_MTU_DISCOVER, &value, sizeof(value)) <0) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_LISTENER, "Unable to set PMTUDISC on listener socket: %s",
- strerror(savederrno));
- goto exit_unlock;
- }
- }
-
- if (_fdset_cloexec(listener->sock)) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_LISTENER, "Unable to set listener CLOEXEC socket opts: %s",
- strerror(savederrno));
- goto exit_unlock;
- }
-
- if (_fdset_nonblock(listener->sock)) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_LISTENER, "Unable to set listener NONBLOCK socket opts: %s",
- strerror(savederrno));
- goto exit_unlock;
- }
-
- if (bind(listener->sock, (struct sockaddr *)&listener->address, sizeof(struct sockaddr_storage)) < 0) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_LISTENER, "Unable to bind listener socket: %s",
- strerror(savederrno));
- goto exit_unlock;
- }
-
- memset(&ev, 0, sizeof(struct epoll_event));
-
- ev.events = EPOLLIN;
- ev.data.fd = listener->sock;
-
- if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, listener->sock, &ev)) {
- savederrno = errno;
- err = -1;
- log_err(knet_h, KNET_SUB_LISTENER, "Unable to add listener to epoll pool: %s",
- strerror(savederrno));
- goto exit_unlock;
- }
+ knet_h->transport_ops->link_listener_start(knet_h, lnk->transport, link_id,
+ &lnk->src_addr, &lnk->dst_addr);
/* pushing new host to the front */
listener->next = knet_h->listener_head;
knet_h->listener_head = listener;
}
lnk->listener_sock = listener->sock;
exit_unlock:
if ((err) && (listener)) {
if (listener->sock >= 0) {
close(listener->sock);
}
free(listener);
listener = NULL;
}
pthread_rwlock_unlock(&knet_h->listener_rwlock);
errno = savederrno;
return err;
}
int _listener_remove(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id)
{
int err = 0, savederrno = 0;
int link_idx;
struct epoll_event ev; /* kernel < 2.6.9 bug (see epoll_ctl man) */
struct knet_host *host;
struct knet_link *lnk = &knet_h->host_index[host_id]->link[link_id];
struct knet_listener *tmp_listener;
struct knet_listener *listener;
int listener_cnt = 0;
savederrno = pthread_rwlock_wrlock(&knet_h->listener_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LISTENER, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
/* checking if listener is in use */
for (host = knet_h->host_head; host != NULL; host = host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (host->link[link_idx].status.enabled != 1)
continue;
if (host->link[link_idx].listener_sock == lnk->listener_sock) {
listener_cnt++;
}
}
}
if (listener_cnt) {
lnk->listener_sock = 0;
log_debug(knet_h, KNET_SUB_LISTENER, "listener_remove: listener still in use");
savederrno = EBUSY;
err = -1;
goto exit_unlock;
}
listener = knet_h->listener_head;
while (listener) {
if (listener->sock == lnk->listener_sock)
break;
listener = listener->next;
}
/* TODO: use a doubly-linked list? */
if (listener == knet_h->listener_head) {
knet_h->listener_head = knet_h->listener_head->next;
} else {
for (tmp_listener = knet_h->listener_head; tmp_listener != NULL; tmp_listener = tmp_listener->next) {
if (listener == tmp_listener->next) {
tmp_listener->next = tmp_listener->next->next;
break;
}
}
}
+ knet_h->transport_ops->link_free(lnk->transport);
+ lnk->transport = NULL;
+
epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, listener->sock, &ev);
close(listener->sock);
free(listener);
exit_unlock:
pthread_rwlock_unlock(&knet_h->listener_rwlock);
errno = savederrno;
return err;
}
#if 0
void socket_debug(knet_handle_t knet_h, int sockfd)
{
struct sockaddr_storage sock;
char host[KNET_MAX_HOST_LEN];
char port[KNET_MAX_PORT_LEN];
socklen_t socklen = sizeof(struct sockaddr_storage);
int err;
memset(&host, 0, KNET_MAX_HOST_LEN);
memset(&port, 0, KNET_MAX_PORT_LEN);
if (getsockname(sockfd, (struct sockaddr *)&sock, &socklen) < 0) {
log_debug(knet_h, KNET_SUB_LINK_T, "Unable to getsockname: %s", strerror(errno));
} else {
err = getnameinfo((const struct sockaddr *)&sock, sizeof(struct sockaddr_storage),
(char *)&host, KNET_MAX_HOST_LEN, (char *)&port, KNET_MAX_PORT_LEN, NI_NUMERICHOST | NI_NUMERICSERV);
if (err) {
log_debug(knet_h, KNET_SUB_LINK_T, "Unable to getnameinfo: %d", err);
} else {
log_debug(knet_h, KNET_SUB_LINK_T, "Sock host: %s port: %s", host, port);
}
}
return;
}
#endif
diff --git a/libknet/logging.c b/libknet/logging.c
index efd2f447..f710cde6 100644
--- a/libknet/logging.c
+++ b/libknet/logging.c
@@ -1,221 +1,222 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <strings.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <stdarg.h>
#include <errno.h>
#include <stdio.h>
#include "internals.h"
#include "logging.h"
struct pretty_names {
const char *name;
uint8_t val;
};
static struct pretty_names subsystem_names[] =
{
{ "common", KNET_SUB_COMMON },
{ "handle", KNET_SUB_HANDLE },
{ "host", KNET_SUB_HOST },
{ "listener", KNET_SUB_LISTENER },
{ "link", KNET_SUB_LINK },
{ "pmtud", KNET_SUB_PMTUD },
{ "send_t", KNET_SUB_SEND_T },
{ "link_t", KNET_SUB_LINK_T },
{ "hb_t", KNET_SUB_HB_T },
{ "switch_t", KNET_SUB_SWITCH_T },
{ "pmtud_t", KNET_SUB_PMTUD_T },
+ { "udp", KNET_SUB_UDP_LINK_T },
{ "filter", KNET_SUB_FILTER },
{ "crypto", KNET_SUB_CRYPTO },
{ "nsscrypto", KNET_SUB_NSSCRYPTO }
};
const char *knet_log_get_subsystem_name(uint8_t subsystem)
{
unsigned int i;
for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) {
if (subsystem_names[i].val == subsystem) {
return subsystem_names[i].name;
}
}
return "common";
}
uint8_t knet_log_get_subsystem_id(const char *name)
{
unsigned int i;
for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) {
if (strcasecmp(name, subsystem_names[i].name) == 0) {
return subsystem_names[i].val;
}
}
return KNET_SUB_COMMON;
}
static struct pretty_names loglevel_names[] =
{
{ "ERROR", KNET_LOG_ERR },
{ "WARNING", KNET_LOG_WARN },
{ "info", KNET_LOG_INFO },
{ "debug", KNET_LOG_DEBUG }
};
const char *knet_log_get_loglevel_name(uint8_t level)
{
unsigned int i;
for (i = 0; i <= KNET_LOG_DEBUG; i++) {
if (loglevel_names[i].val == level) {
return loglevel_names[i].name;
}
}
return "ERROR";
}
uint8_t knet_log_get_loglevel_id(const char *name)
{
unsigned int i;
for (i = 0; i <= KNET_LOG_DEBUG; i++) {
if (strcasecmp(name, loglevel_names[i].name) == 0) {
return loglevel_names[i].val;
}
}
return KNET_LOG_ERR;
}
int knet_log_set_loglevel(knet_handle_t knet_h, uint8_t subsystem,
uint8_t level)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (subsystem > KNET_SUB_LAST) {
errno = EINVAL;
return -1;
}
if (level > KNET_LOG_DEBUG) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, subsystem, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->log_levels[subsystem] = level;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_log_get_loglevel(knet_handle_t knet_h, uint8_t subsystem,
uint8_t *level)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (subsystem > KNET_SUB_LAST) {
errno = EINVAL;
return -1;
}
if (!level) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, subsystem, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*level = knet_h->log_levels[subsystem];
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
void log_msg(knet_handle_t knet_h, uint8_t subsystem, uint8_t msglevel,
const char *fmt, ...)
{
va_list ap;
struct knet_log_msg msg;
size_t byte_cnt = 0;
int len, err;
if ((!knet_h) ||
(subsystem > KNET_SUB_LAST) ||
(msglevel > knet_h->log_levels[subsystem]))
return;
/*
* most logging calls will take place with locking in place.
* if we get an EINVAL and locking is initialized, then
* we are getting a real error and we need to stop
*/
err = pthread_rwlock_tryrdlock(&knet_h->global_rwlock);
if ((err == EAGAIN) && (knet_h->lock_init_done))
return;
if (knet_h->logfd <= 0)
goto out_unlock;
memset(&msg, 0, sizeof(struct knet_log_msg));
msg.subsystem = subsystem;
msg.msglevel = msglevel;
va_start(ap, fmt);
vsnprintf(msg.msg, sizeof(msg.msg) - 2, fmt, ap);
va_end(ap);
len = strlen(msg.msg);
msg.msg[len+1] = '\n';
while (byte_cnt < sizeof(struct knet_log_msg)) {
len = write(knet_h->logfd, &msg, sizeof(struct knet_log_msg) - byte_cnt);
if (len <= 0)
return;
byte_cnt += len;
}
out_unlock:
/*
* unlock only if we are holding the lock
*/
if (!err)
pthread_rwlock_unlock(&knet_h->global_rwlock);
return;
}
diff --git a/libknet/onwire.h b/libknet/onwire.h
index 825e4466..e2b8ec6f 100644
--- a/libknet/onwire.h
+++ b/libknet/onwire.h
@@ -1,192 +1,195 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#ifndef __ONWIRE_H__
#define __ONWIRE_H__
/*
* data structures to define network packets.
* Start from knet_header at the bottom
*/
#include <stdint.h>
#if 0
/*
* for future protocol extension (re-switching table calculation)
*/
struct knet_hinfo_link {
uint8_t khl_link_id;
uint8_t khl_link_dynamic;
uint8_t khl_link_priority;
uint64_t khl_link_latency;
char khl_link_dst_ipaddr[KNET_MAX_HOST_LEN];
char khl_link_dst_port[KNET_MAX_PORT_LEN];
} __attribute__((packed));
struct knet_hinfo_link_table {
uint16_t khlt_node_id;
uint8_t khlt_local; /* we have this node connected locally */
struct knet_hinfo_link khlt_link[KNET_MAX_LINK]; /* info we send about each link in the node */
} __attribute__((packed));
struct link_table {
uint16_t khdt_host_entries;
uint8_t khdt_host_maps[0]; /* array of knet_hinfo_link_table[khdt_host_entries] */
} __attribute__((packed));
#endif
#define KNET_HOSTINFO_LINK_STATUS_DOWN 0
#define KNET_HOSTINFO_LINK_STATUS_UP 1
struct knet_hostinfo_payload_link_status {
uint8_t khip_link_status_link_id; /* link id */
uint8_t khip_link_status_status; /* up/down status */
} __attribute__((packed));
/*
* union to reference possible individual payloads
*/
union knet_hostinfo_payload {
struct knet_hostinfo_payload_link_status knet_hostinfo_payload_link_status;
} __attribute__((packed));
/*
* due to the nature of knet_hostinfo, we are currently
* sending those data as part of knet_header_payload_data.khp_data_userdata
* and avoid a union that increses knet_header_payload_data size
* unnecessarely.
* This might change later on depending on how we implement
* host info exchange
*/
#define KNET_HOSTINFO_TYPE_LINK_UP_DOWN 0
#define KNET_HOSTINFO_TYPE_LINK_TABLE 1 // NOT IMPLEMENTED
#define KNET_HOSTINFO_UCAST 0 /* send info to a specific host */
#define KNET_HOSTINFO_BCAST 1 /* send info to all known / connected hosts */
struct knet_hostinfo {
uint8_t khi_type; /* type of hostinfo we are sending */
uint8_t khi_bcast; /* hostinfo destination bcast/ucast */
uint16_t khi_dst_node_id;/* used only if in ucast mode */
union knet_hostinfo_payload khi_payload;
} __attribute__((packed));
#define KNET_HOSTINFO_ALL_SIZE sizeof(struct knet_hostinfo)
#define KNET_HOSTINFO_SIZE (KNET_HOSTINFO_ALL_SIZE - sizeof(union knet_hostinfo_payload))
#define KNET_HOSTINFO_LINK_STATUS_SIZE (KNET_HOSTINFO_SIZE + sizeof(struct knet_hostinfo_payload_link_status))
#define khip_link_status_status khi_payload.knet_hostinfo_payload_link_status.khip_link_status_status
#define khip_link_status_link_id khi_payload.knet_hostinfo_payload_link_status.khip_link_status_link_id
/*
* typedef uint64_t seq_num_t;
* #define SEQ_MAX UINT64_MAX
*/
typedef uint16_t seq_num_t;
#define SEQ_MAX UINT16_MAX
struct knet_header_payload_data {
seq_num_t khp_data_seq_num; /* pckt seq number used to deduplicate pkcts */
uint8_t khp_data_pad1; /* make sure to have space in the header to grow features */
uint8_t khp_data_pad2;
uint8_t khp_data_bcast; /* data destination bcast/ucast */
uint8_t khp_data_frag_num; /* number of fragments of this pckt. 1 is not fragmented */
uint8_t khp_data_frag_seq; /* as above, indicates the frag sequence number */
int8_t khp_data_channel; /* transport channel data for localsock <-> knet <-> localsock mapping */
uint8_t khp_data_userdata[0]; /* pointer to the real user data */
} __attribute__((packed));
struct knet_header_payload_ping {
uint8_t khp_ping_link; /* source link id */
uint32_t khp_ping_time[4]; /* ping timestamp */
} __attribute__((packed));
/* taken from tracepath6 */
#define KNET_PMTUD_SIZE_V4 65535
#define KNET_PMTUD_SIZE_V6 KNET_PMTUD_SIZE_V4
-#define KNET_PMTUD_OVERHEAD_V4 28
-#define KNET_PMTUD_OVERHEAD_V6 48
+
+/* These two get the protocol-specific overheads added to them */
+#define KNET_PMTUD_OVERHEAD_V4 20
+#define KNET_PMTUD_OVERHEAD_V6 40
+
#define KNET_PMTUD_MIN_MTU_V4 576
#define KNET_PMTUD_MIN_MTU_V6 1280
struct knet_header_payload_pmtud {
uint8_t khp_pmtud_link; /* source link id */
uint16_t khp_pmtud_size; /* size of the current packet */
uint8_t khp_pmtud_data[0]; /* pointer to empty/random data/fill buffer */
} __attribute__((packed));
/*
* union to reference possible individual payloads
*/
union knet_header_payload {
struct knet_header_payload_data khp_data; /* pure data packet struct */
struct knet_header_payload_ping khp_ping; /* heartbeat packet struct */
struct knet_header_payload_pmtud khp_pmtud; /* Path MTU discovery packet struct */
} __attribute__((packed));
/*
* starting point
*/
#define KNET_HEADER_VERSION 0x01 /* we currently support only one version */
#define KNET_HEADER_TYPE_DATA 0x00 /* pure data packet */
#define KNET_HEADER_TYPE_HOST_INFO 0x01 /* host status information pckt */
#define KNET_HEADER_TYPE_PMSK 0x80 /* packet mask */
#define KNET_HEADER_TYPE_PING 0x81 /* heartbeat */
#define KNET_HEADER_TYPE_PONG 0x82 /* reply to heartbeat */
#define KNET_HEADER_TYPE_PMTUD 0x83 /* Used to determine Path MTU */
#define KNET_HEADER_TYPE_PMTUD_REPLY 0x84 /* reply from remote host */
struct knet_header {
uint8_t kh_version; /* pckt format/version */
uint8_t kh_type; /* from above defines. Tells what kind of pckt it is */
uint16_t kh_node; /* host id of the source host for this pckt */
uint8_t kh_pad1; /* make sure to have space in the header to grow features */
uint8_t kh_pad2;
union knet_header_payload kh_payload; /* union of potential data struct based on kh_type */
} __attribute__((packed));
/*
* commodoty defines to hide structure nesting
* (needs review and cleanup)
*/
#define khp_data_seq_num kh_payload.khp_data.khp_data_seq_num
#define khp_data_frag_num kh_payload.khp_data.khp_data_frag_num
#define khp_data_frag_seq kh_payload.khp_data.khp_data_frag_seq
#define khp_data_userdata kh_payload.khp_data.khp_data_userdata
#define khp_data_bcast kh_payload.khp_data.khp_data_bcast
#define khp_data_channel kh_payload.khp_data.khp_data_channel
#define khp_ping_link kh_payload.khp_ping.khp_ping_link
#define khp_ping_time kh_payload.khp_ping.khp_ping_time
#define khp_pmtud_link kh_payload.khp_pmtud.khp_pmtud_link
#define khp_pmtud_size kh_payload.khp_pmtud.khp_pmtud_size
#define khp_pmtud_data kh_payload.khp_pmtud.khp_pmtud_data
/*
* extra defines to avoid mingling with sizeof() too much
*/
#define KNET_HEADER_ALL_SIZE sizeof(struct knet_header)
#define KNET_HEADER_SIZE (KNET_HEADER_ALL_SIZE - sizeof(union knet_header_payload))
#define KNET_HEADER_PING_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_ping))
#define KNET_HEADER_PMTUD_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_pmtud))
#define KNET_HEADER_DATA_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_data))
#endif
diff --git a/libknet/threads_heartbeat.c b/libknet/threads_heartbeat.c
index 82df36a4..1322f14b 100644
--- a/libknet/threads_heartbeat.c
+++ b/libknet/threads_heartbeat.c
@@ -1,123 +1,123 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include "crypto.h"
#include "link.h"
#include "logging.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
static void _handle_check_each(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link)
{
int len;
ssize_t outlen = KNET_HEADER_PING_SIZE;
struct timespec clock_now, pong_last;
unsigned long long diff_ping;
unsigned char *outbuf = (unsigned char *)knet_h->pingbuf;
/* caching last pong to avoid race conditions */
pong_last = dst_link->status.pong_last;
if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) {
log_debug(knet_h, KNET_SUB_HB_T, "Unable to get monotonic clock");
return;
}
timespec_diff(dst_link->ping_last, clock_now, &diff_ping);
if (diff_ping >= (dst_link->ping_interval * 1000llu)) {
memmove(&knet_h->pingbuf->khp_ping_time[0], &clock_now, sizeof(struct timespec));
knet_h->pingbuf->khp_ping_link = dst_link->link_id;
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)knet_h->pingbuf,
outlen,
knet_h->pingbuf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_HB_T, "Unable to crypto ping packet");
return;
}
outbuf = knet_h->pingbuf_crypt;
}
- len = sendto(dst_link->listener_sock, outbuf, outlen,
+ len = sendto(dst_link->outsock, outbuf, outlen,
MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &dst_link->dst_addr,
sizeof(struct sockaddr_storage));
dst_link->ping_last = clock_now;
if (len != outlen) {
log_debug(knet_h, KNET_SUB_HB_T,
"Unable to send ping (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
- dst_link->listener_sock, errno, strerror(errno),
+ dst_link->outsock, errno, strerror(errno),
dst_link->status.src_ipaddr, dst_link->status.src_port,
dst_link->status.dst_ipaddr, dst_link->status.dst_port);
} else {
dst_link->last_ping_size = outlen;
}
}
timespec_diff(pong_last, clock_now, &diff_ping);
if ((pong_last.tv_nsec) &&
(diff_ping >= (dst_link->pong_timeout * 1000llu))) {
dst_link->received_pong = 0;
dst_link->status.pong_last.tv_nsec = 0;
if (dst_link->status.connected == 1) {
log_info(knet_h, KNET_SUB_LINK, "host: %u link: %u is down",
dst_host->host_id, dst_link->link_id);
_link_updown(knet_h, dst_host->host_id, dst_link->link_id, dst_link->status.enabled, 0);
}
}
}
void *_handle_heartbt_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct knet_host *dst_host;
int link_idx;
/* preparing ping buffer */
knet_h->pingbuf->kh_version = KNET_HEADER_VERSION;
knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING;
knet_h->pingbuf->kh_node = htons(knet_h->host_id);
while (!shutdown_in_progress(knet_h)) {
usleep(KNET_THREADS_TIMERES);
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_HB_T, "Unable to get read lock");
continue;
}
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if ((dst_host->link[link_idx].status.enabled != 1) ||
((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
(dst_host->link[link_idx].status.dynconnected != 1)))
continue;
_handle_check_each(knet_h, dst_host, &dst_host->link[link_idx]);
}
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
return NULL;
}
diff --git a/libknet/threads_pmtud.c b/libknet/threads_pmtud.c
index eef74c76..16a816a6 100644
--- a/libknet/threads_pmtud.c
+++ b/libknet/threads_pmtud.c
@@ -1,388 +1,388 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include "crypto.h"
#include "link.h"
#include "host.h"
#include "logging.h"
#include "threads_common.h"
#include "threads_pmtud.h"
static int _handle_check_link_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link)
{
int ret, savederrno, mutex_retry_limit, failsafe;
ssize_t onwire_len; /* current packet onwire size */
ssize_t overhead_len; /* onwire packet overhead (protocol based) */
ssize_t max_mtu_len; /* max mtu for protocol */
ssize_t data_len; /* how much data we can send in the packet
* generally would be onwire_len - overhead_len
* needs to be adjusted for crypto
*/
ssize_t pad_len; /* crypto packet pad size, needs to move into crypto.c callbacks */
int len; /* len of what we were able to sendto onwire */
struct timespec ts;
unsigned char *outbuf = (unsigned char *)knet_h->pmtudbuf;
mutex_retry_limit = 0;
failsafe = 0;
pad_len = 0;
- dst_link->last_bad_mtu = 0;
+ dst_link->last_bad_mtu = knet_h->transport_ops->link_get_mtu_overhead(dst_link->transport);
knet_h->pmtudbuf->khp_pmtud_link = dst_link->link_id;
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
max_mtu_len = KNET_PMTUD_SIZE_V6;
overhead_len = KNET_PMTUD_OVERHEAD_V6;
dst_link->last_good_mtu = dst_link->last_ping_size + KNET_PMTUD_OVERHEAD_V6;
break;
case AF_INET:
max_mtu_len = KNET_PMTUD_SIZE_V4;
overhead_len = KNET_PMTUD_OVERHEAD_V4;
dst_link->last_good_mtu = dst_link->last_ping_size + KNET_PMTUD_OVERHEAD_V6;
break;
default:
log_debug(knet_h, KNET_SUB_PMTUD_T, "PMTUD aborted, unknown protocol");
return -1;
break;
}
/*
* discovery starts from the top because kernel will
* refuse to send packets > current iface mtu.
* this saves us some time and network bw.
*/
onwire_len = max_mtu_len;
restart:
/*
* prevent a race when interface mtu is changed _exactly_ during
* the discovery process and it's complex to detect. Easier
* to wait the next loop.
* 30 is not an arbitrary value. To bisect from 576 to 128000 doesn't
* take more than 18/19 steps.
*/
if (failsafe == 30) {
log_err(knet_h, KNET_SUB_PMTUD_T,
"Aborting PMTUD process: Too many attempts. MTU might have changed during discovery.");
return -1;
} else {
failsafe++;
}
data_len = onwire_len - overhead_len;
if (knet_h->crypto_instance) {
if (knet_h->sec_block_size) {
pad_len = knet_h->sec_block_size - (data_len % knet_h->sec_block_size);
if (pad_len == knet_h->sec_block_size) {
pad_len = 0;
}
data_len = data_len + pad_len;
}
data_len = data_len + (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size);
if (knet_h->sec_block_size) {
while (data_len + overhead_len >= max_mtu_len) {
data_len = data_len - knet_h->sec_block_size;
}
}
if (dst_link->last_bad_mtu) {
while (data_len + overhead_len >= dst_link->last_bad_mtu) {
data_len = data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size);
}
}
if (data_len < (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size) + 1) {
log_debug(knet_h, KNET_SUB_PMTUD_T, "Aborting PMTUD process: link mtu smaller than crypto header detected (link might have been disconnected)");
return -1;
}
onwire_len = data_len + overhead_len;
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)knet_h->pmtudbuf,
data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size),
knet_h->pmtudbuf_crypt,
&data_len) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD_T, "Unable to crypto pmtud packet");
return -1;
}
outbuf = knet_h->pmtudbuf_crypt;
} else {
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
}
/* link has gone down, aborting pmtud */
if (dst_link->status.connected != 1) {
log_debug(knet_h, KNET_SUB_PMTUD_T, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id);
return -1;
}
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD_T, "Unable to get mutex lock");
return -1;
}
- len = sendto(dst_link->listener_sock, outbuf, data_len,
+ len = sendto(dst_link->outsock, outbuf, data_len,
MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &dst_link->dst_addr,
sizeof(struct sockaddr_storage));
savederrno = errno;
if ((len < 0) && (savederrno != EMSGSIZE)) {
log_debug(knet_h, KNET_SUB_PMTUD_T, "Unable to send pmtu packet (sendto): %d %s", savederrno, strerror(savederrno));
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return -1;
}
if (len != data_len) {
/*
* this is coming from "localhost" already.
*/
if (savederrno == EMSGSIZE) {
dst_link->last_bad_mtu = onwire_len;
} else {
log_debug(knet_h, KNET_SUB_PMTUD_T, "Unable to send pmtu packet len: %zu err: %s", onwire_len, strerror(savederrno));
}
} else {
dst_link->last_sent_mtu = onwire_len;
dst_link->last_recv_mtu = 0;
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD_T, "Unable to get current time: %s", strerror(errno));
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return -1;
}
/*
* Set an artibrary 2 seconds timeout to receive a PMTUd reply
* perhaps this should be configurable but:
* 1) too short timeout can cause instability since MTU value
* influeces link status
* 2) too high timeout slows down the MTU detection process for
* small MTU
*
* Another option is to make the PMTUd process less influent
* in link status detection but that could cause data packet loss
* without link up/down changes
*/
ts.tv_sec += 2;
ret = pthread_cond_timedwait(&knet_h->pmtud_cond, &knet_h->pmtud_mutex, &ts);
if (shutdown_in_progress(knet_h)) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
log_debug(knet_h, KNET_SUB_PMTUD_T, "PMTUD aborted. shutdown in progress");
return -1;
}
if ((ret != 0) && (ret != ETIMEDOUT)) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
if (mutex_retry_limit == 3) {
log_debug(knet_h, KNET_SUB_PMTUD_T, "PMTUD aborted, unable to get mutex lock");
return -1;
}
mutex_retry_limit++;
goto restart;
}
if ((dst_link->last_recv_mtu != onwire_len) || (ret)) {
dst_link->last_bad_mtu = onwire_len;
} else {
int found_mtu = 0;
if (knet_h->sec_block_size) {
if ((onwire_len + knet_h->sec_block_size >= max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu <= (onwire_len + knet_h->sec_block_size)))) {
found_mtu = 1;
}
} else {
if ((onwire_len == max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu == (onwire_len + 1)))) {
found_mtu = 1;
}
}
if (found_mtu) {
/*
* account for IP overhead, knet headers and crypto in PMTU calculation
*/
dst_link->status.mtu = onwire_len - dst_link->status.proto_overhead;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return 0;
}
dst_link->last_good_mtu = onwire_len;
}
}
onwire_len = (dst_link->last_good_mtu + dst_link->last_bad_mtu) / 2;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
goto restart;
}
static int _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, unsigned int *min_mtu)
{
uint8_t saved_valid_pmtud;
unsigned int saved_pmtud;
struct timespec clock_now;
unsigned long long diff_pmtud, interval;
interval = knet_h->pmtud_interval * 1000000000llu; /* nanoseconds */
if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD_T, "Unable to get monotonic clock");
return 0;
}
timespec_diff(dst_link->pmtud_last, clock_now, &diff_pmtud);
if (diff_pmtud < interval) {
*min_mtu = dst_link->status.mtu;
return dst_link->has_valid_mtu;
}
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V6 + KNET_HEADER_ALL_SIZE + knet_h->sec_header_size;
break;
case AF_INET:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V4 + KNET_HEADER_ALL_SIZE + knet_h->sec_header_size;
break;
}
saved_pmtud = dst_link->status.mtu;
saved_valid_pmtud = dst_link->has_valid_mtu;
log_debug(knet_h, KNET_SUB_PMTUD_T, "Starting PMTUD for host: %u link: %u", dst_host->host_id, dst_link->link_id);
if (_handle_check_link_pmtud(knet_h, dst_host, dst_link) < 0) {
dst_link->has_valid_mtu = 0;
} else {
dst_link->has_valid_mtu = 1;
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
if (((dst_link->status.mtu + dst_link->status.proto_overhead) < KNET_PMTUD_MIN_MTU_V6) ||
((dst_link->status.mtu + dst_link->status.proto_overhead) > KNET_PMTUD_SIZE_V6)) {
log_debug(knet_h, KNET_SUB_PMTUD_T,
"PMTUD detected an IPv6 MTU out of bound value (%u) for host: %u link: %u.",
dst_link->status.mtu + dst_link->status.proto_overhead, dst_host->host_id, dst_link->link_id);
dst_link->has_valid_mtu = 0;
}
break;
case AF_INET:
if (((dst_link->status.mtu + dst_link->status.proto_overhead) < KNET_PMTUD_MIN_MTU_V4) ||
((dst_link->status.mtu + dst_link->status.proto_overhead) > KNET_PMTUD_SIZE_V4)) {
log_debug(knet_h, KNET_SUB_PMTUD_T,
"PMTUD detected an IPv4 MTU out of bound value (%u) for host: %u link: %u.",
dst_link->status.mtu + dst_link->status.proto_overhead, dst_host->host_id, dst_link->link_id);
dst_link->has_valid_mtu = 0;
}
break;
}
if (dst_link->has_valid_mtu) {
if ((saved_pmtud) && (saved_pmtud != dst_link->status.mtu)) {
log_info(knet_h, KNET_SUB_PMTUD_T, "PMTUD link change for host: %u link: %u from %u to %u",
dst_host->host_id, dst_link->link_id, saved_pmtud, dst_link->status.mtu);
}
log_debug(knet_h, KNET_SUB_PMTUD_T, "PMTUD completed for host: %u link: %u current link mtu: %u",
dst_host->host_id, dst_link->link_id, dst_link->status.mtu);
if (dst_link->status.mtu < *min_mtu) {
*min_mtu = dst_link->status.mtu;
}
dst_link->pmtud_last = clock_now;
}
}
if (saved_valid_pmtud != dst_link->has_valid_mtu) {
_host_dstcache_update_sync(knet_h, dst_host);
}
return dst_link->has_valid_mtu;
}
void *_handle_pmtud_link_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct knet_host *dst_host;
struct knet_link *dst_link;
int link_idx;
unsigned int min_mtu, have_mtu;
knet_h->data_mtu = KNET_PMTUD_MIN_MTU_V4 - KNET_HEADER_ALL_SIZE - knet_h->sec_header_size;
/* preparing pmtu buffer */
knet_h->pmtudbuf->kh_version = KNET_HEADER_VERSION;
knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD;
knet_h->pmtudbuf->kh_node = htons(knet_h->host_id);
while (!shutdown_in_progress(knet_h)) {
usleep(KNET_THREADS_TIMERES);
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD_T, "Unable to get read lock");
continue;
}
min_mtu = KNET_PMTUD_SIZE_V4 - KNET_HEADER_ALL_SIZE - knet_h->sec_header_size;
have_mtu = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
dst_link = &dst_host->link[link_idx];
if ((dst_link->status.enabled != 1) ||
(dst_link->status.connected != 1) ||
(!dst_link->last_ping_size) ||
((dst_link->dynamic == KNET_LINK_DYNIP) &&
(dst_link->status.dynconnected != 1)))
continue;
if (_handle_check_pmtud(knet_h, dst_host, dst_link, &min_mtu)) {
have_mtu = 1;
}
}
}
if (have_mtu) {
if (knet_h->data_mtu != min_mtu) {
knet_h->data_mtu = min_mtu;
log_info(knet_h, KNET_SUB_PMTUD_T, "Global data MTU changed to: %u", knet_h->data_mtu);
if (knet_h->pmtud_notify_fn) {
knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data,
knet_h->data_mtu);
}
}
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
return NULL;
}
diff --git a/libknet/threads_send_recv.c b/libknet/threads_send_recv.c
index aa645c4f..d7b34430 100644
--- a/libknet/threads_send_recv.c
+++ b/libknet/threads_send_recv.c
@@ -1,1185 +1,1216 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <math.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include "crypto.h"
#include "compat.h"
#include "host.h"
#include "link.h"
#include "logging.h"
+#include "transports.h"
#include "threads_common.h"
#include "threads_send_recv.h"
/*
* SEND
*/
static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct iovec *iov_out)
{
int link_idx, msg_idx, sent_msgs, msgs_to_send, prev_sent, progress;
struct mmsghdr msg[PCKT_FRAG_MAX];
int err = 0, savederrno = 0;
memset(&msg, 0, sizeof(struct mmsghdr));
for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
msgs_to_send = knet_h->send_to_links_buf[0]->khp_data_frag_num;
sent_msgs = 0;
prev_sent = 0;
progress = 1;
retry:
msg_idx = 0;
while (msg_idx < msgs_to_send) {
memset(&msg[msg_idx].msg_hdr, 0, sizeof(struct msghdr));
msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr;
msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx + prev_sent];
msg[msg_idx].msg_hdr.msg_iovlen = 1;
msg_idx++;
}
- sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].listener_sock,
+ sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
msg, msg_idx, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
if ((sent_msgs >= 0) && (sent_msgs < msg_idx)) {
if ((sent_msgs) || (progress)) {
msgs_to_send = msg_idx - sent_msgs;
prev_sent = prev_sent + sent_msgs;
if (sent_msgs) {
progress = 1;
} else {
progress = 0;
}
log_debug(knet_h, KNET_SUB_SEND_T, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
sent_msgs, msg_idx,
dst_host->name, dst_host->host_id,
dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
dst_host->link[dst_host->active_links[link_idx]].link_id);
goto retry;
}
if (!progress) {
savederrno = EAGAIN;
err = -1;
goto out_unlock;
}
}
if (sent_msgs < 0) {
log_debug(knet_h, KNET_SUB_SEND_T, "Unable to send data packet to host %s (%u) link %s:%s (%u): %s",
dst_host->name, dst_host->host_id,
dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
dst_host->link[dst_host->active_links[link_idx]].link_id,
strerror(savederrno));
err = -1;
goto out_unlock;
}
if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
(dst_host->active_link_entries > 1)) {
uint8_t cur_link_id = dst_host->active_links[0];
memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
break;
}
}
out_unlock:
errno = savederrno;
return err;
}
static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync)
{
ssize_t outlen, frag_len;
struct knet_host *dst_host;
uint16_t dst_host_ids_temp[KNET_MAX_HOST];
size_t dst_host_ids_entries_temp = 0;
uint16_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[PCKT_FRAG_MAX];
uint8_t frag_idx;
unsigned int temp_data_mtu;
int host_idx;
int send_mcast = 0;
struct knet_header *inbuf;
int savederrno = 0;
int err = 0;
inbuf = knet_h->recv_from_sock_buf[buf_idx];
if ((knet_h->enabled != 1) &&
(inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
log_debug(knet_h, KNET_SUB_SEND_T, "Received data packet but forwarding is disabled");
savederrno = ECANCELED;
err = -1;
goto out_unlock;
}
/*
* move this into a separate function to expand on
* extra switching rules
*/
switch(inbuf->kh_type) {
case KNET_HEADER_TYPE_DATA:
if (knet_h->dst_host_filter_fn) {
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
inlen,
KNET_NOTIFY_TX,
knet_h->host_id,
knet_h->host_id,
&channel,
dst_host_ids_temp,
&dst_host_ids_entries_temp);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_SEND_T, "Error from dst_host_filter_fn: %d", bcast);
savederrno = EFAULT;
err = -1;
goto out_unlock;
}
if ((!bcast) && (!dst_host_ids_entries_temp)) {
log_debug(knet_h, KNET_SUB_SEND_T, "Message is unicast but no dst_host_ids_entries");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
}
break;
case KNET_HEADER_TYPE_HOST_INFO:
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
bcast = 0;
dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
dst_host_ids_entries_temp = 1;
knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
}
break;
default:
log_warn(knet_h, KNET_SUB_SEND_T, "Receiving unknown messages from socket");
savederrno = ENOMSG;
err = -1;
goto out_unlock;
break;
}
if (is_sync) {
if ((bcast) ||
((!bcast) && (dst_host_ids_entries_temp > 1))) {
log_debug(knet_h, KNET_SUB_SEND_T, "knet_send_sync is only supported with unicast packets for one destination");
savederrno = E2BIG;
err = -1;
goto out_unlock;
}
}
/*
* check destinations hosts before spending time
* in fragmenting/encrypting packets to save
* time processing data for unrechable hosts.
* for unicast, also remap the destination data
* to skip unreachable hosts.
*/
if (!bcast) {
dst_host_ids_entries = 0;
for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
if (!dst_host) {
continue;
}
if (dst_host->status.reachable) {
dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
dst_host_ids_entries++;
}
}
if (!dst_host_ids_entries) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
} else {
send_mcast = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (dst_host->status.reachable) {
send_mcast = 1;
break;
}
}
if (!send_mcast) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
}
if (!knet_h->data_mtu) {
/*
* using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
*/
log_debug(knet_h, KNET_SUB_SEND_T,
"Received data packet but data MTU is still unknown."
" Packet might not be delivered."
" Assuming mininum IPv4 mtu (%d)",
KNET_PMTUD_MIN_MTU_V4);
temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
} else {
/*
* take a copy of the mtu to avoid value changing under
* our feet while we are sending a fragmented pckt
*/
temp_data_mtu = knet_h->data_mtu;
}
/*
* prepare the outgoing buffers
*/
frag_len = inlen;
frag_idx = 0;
inbuf->khp_data_bcast = bcast;
inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
inbuf->khp_data_channel = channel;
while (frag_idx < inbuf->khp_data_frag_num) {
/*
* set the iov_base
*/
iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
/*
* set the len
*/
if (frag_len > temp_data_mtu) {
iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE;
} else {
iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
}
/*
* copy the frag info on all buffers
*/
knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata,
inbuf->khp_data_userdata + (temp_data_mtu * frag_idx),
iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE);
frag_len = frag_len - temp_data_mtu;
frag_idx++;
}
if (!bcast) {
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids[host_idx]];
knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(++dst_host->ucast_seq_num_tx);
frag_idx = 0;
while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) {
knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num;
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(
knet_h,
(const unsigned char *)knet_h->send_to_links_buf[frag_idx],
iov_out[frag_idx].iov_len,
knet_h->send_to_links_buf_crypt[frag_idx],
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_SEND_T, "Unable to encrypt unicast packet");
savederrno = ECHILD;
err = -1;
goto out_unlock;
}
iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
iov_out[frag_idx].iov_len = outlen;
}
frag_idx++;
}
err = _dispatch_to_links(knet_h, dst_host, iov_out);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
} else {
knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(++knet_h->bcast_seq_num_tx);
frag_idx = 0;
while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) {
knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num;
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(
knet_h,
(const unsigned char *)knet_h->send_to_links_buf[frag_idx],
iov_out[frag_idx].iov_len,
knet_h->send_to_links_buf_crypt[frag_idx],
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_SEND_T, "Unable to encrypt unicast packet");
savederrno = ECHILD;
err = -1;
goto out_unlock;
}
iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
iov_out[frag_idx].iov_len = outlen;
}
frag_idx++;
}
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (dst_host->status.reachable) {
err = _dispatch_to_links(knet_h, dst_host, iov_out);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
}
}
out_unlock:
if ((inlen > 0) && (inbuf->kh_type == KNET_HEADER_TYPE_HOST_INFO)) {
if (pthread_mutex_lock(&knet_h->host_mutex) != 0)
log_debug(knet_h, KNET_SUB_SEND_T, "Unable to get mutex lock");
pthread_cond_signal(&knet_h->host_cond);
pthread_mutex_unlock(&knet_h->host_mutex);
}
errno = savederrno;
return err;
}
int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_SEND_T, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out;
}
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_SEND_T, "Unable to get TX mutex lock: %s",
strerror(savederrno));
err = -1;
goto out;
}
knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA;
memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len);
err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1);
savederrno = errno;
pthread_mutex_unlock(&knet_h->tx_mutex);
out:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
+
+static void _close_socket(knet_handle_t knet_h, int sockfd)
+{
+ struct epoll_event ev;
+
+ log_err(knet_h, KNET_SUB_LINK_T, "EOF received on socket fd %d", sockfd);
+
+ memset(&ev, 0, sizeof(struct epoll_event));
+
+ ev.events = EPOLLIN;
+ ev.data.fd = sockfd;
+ if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) {
+ log_err(knet_h, KNET_SUB_LISTENER, "Unable to remove EOFed socket from epoll pool: %s",
+ strerror(errno));
+ }
+
+ /* Tell transport that the FD has been closed */
+ knet_h->transport_ops->handle_fd_eof(knet_h, sockfd);
+}
+
static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct mmsghdr *msg, int type)
{
ssize_t inlen = 0;
struct iovec iov_in;
int msg_recv, i;
int savederrno = 0, docallback = 0;
if ((channel >= 0) &&
(channel < KNET_DATAFD_MAX) &&
(!knet_h->sockfd[channel].is_socket)) {
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata;
iov_in.iov_len = KNET_MAX_PACKET_SIZE;
inlen = readv(sockfd, &iov_in, 1);
if (inlen <= 0) {
savederrno = errno;
docallback = 1;
goto out;
}
msg_recv = 1;
knet_h->recv_from_sock_buf[0]->kh_type = type;
_parse_recv_from_sock(knet_h, 0, inlen, channel, 0);
} else {
msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
if (msg_recv < 0) {
inlen = msg_recv;
savederrno = errno;
docallback = 1;
goto out;
}
for (i = 0; i < msg_recv; i++) {
inlen = msg[i].msg_len;
if (inlen == 0) {
savederrno = 0;
docallback = 1;
goto out;
break;
}
knet_h->recv_from_sock_buf[i]->kh_type = type;
_parse_recv_from_sock(knet_h, i, inlen, channel, 0);
}
}
out:
if (inlen < 0) {
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
log_err(knet_h, KNET_SUB_SEND_T, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
} else {
knet_h->sockfd[channel].has_error = 1;
}
}
if (docallback) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_TX,
inlen,
savederrno);
}
}
void *_handle_send_to_links_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_FRAG_MAX];
struct mmsghdr msg[PCKT_FRAG_MAX];
struct iovec iov_in[PCKT_FRAG_MAX];
int i, nev, type;
int8_t channel;
memset(&msg, 0, sizeof(struct mmsghdr));
/* preparing data buffer */
for (i = 0; i < PCKT_FRAG_MAX; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata;
iov_in[i].iov_len = KNET_MAX_PACKET_SIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION;
knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0;
knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id);
knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1);
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_SEND_T, "Unable to get read lock");
continue;
}
for (i = 0; i < nev; i++) {
if (events[i].data.fd == knet_h->hostsockfd[0]) {
type = KNET_HEADER_TYPE_HOST_INFO;
channel = -1;
} else {
type = KNET_HEADER_TYPE_DATA;
for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
if ((knet_h->sockfd[channel].in_use) &&
(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
break;
}
}
}
if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
log_debug(knet_h, KNET_SUB_SEND_T, "Unable to get mutex lock");
pthread_rwlock_unlock(&knet_h->listener_rwlock);
continue;
}
_handle_send_to_links(knet_h, events[i].data.fd, channel, msg, type);
pthread_mutex_unlock(&knet_h->tx_mutex);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
return NULL;
}
/*
* RECV
*/
/*
* return 1 if a > b
* return -1 if b > a
* return 0 if they are equal
*/
static inline int timecmp(struct timespec a, struct timespec b)
{
if (a.tv_sec != b.tv_sec) {
if (a.tv_sec > b.tv_sec) {
return 1;
} else {
return -1;
}
} else {
if (a.tv_nsec > b.tv_nsec) {
return 1;
} else if (a.tv_nsec < b.tv_nsec) {
return -1;
} else {
return 0;
}
}
}
/*
* this functions needs to return an index (0 to 7)
* to a knet_host_defrag_buf. (-1 on errors)
*/
static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
{
struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
int i, oldest;
/*
* check if there is a buffer already in use handling the same seq_num
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (src_host->defrag_buf[i].in_use) {
if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
return i;
}
}
}
/*
* If there is no buffer that's handling the current seq_num
* either it's new or it's been reclaimed already.
* check if it's been reclaimed/seen before using the defrag circular
* buffer. If the pckt has been seen before, the buffer expired (ETIME)
* and there is no point to try to defrag it again.
*/
if (!_seq_num_lookup(src_host, inbuf->khp_data_bcast, inbuf->khp_data_seq_num, 1)) {
errno = ETIME;
return -1;
}
/*
* register the pckt as seen
*/
_seq_num_set(src_host, inbuf->khp_data_bcast, inbuf->khp_data_seq_num, 1);
/*
* see if there is a free buffer
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (!src_host->defrag_buf[i].in_use) {
return i;
}
}
/*
* at this point, there are no free buffers, the pckt is new
* and we need to reclaim a buffer, and we will take the one
* with the oldest timestamp. It's as good as any.
*/
oldest = 0;
for (i = 0; i < KNET_MAX_LINK; i++) {
if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
oldest = i;
}
}
src_host->defrag_buf[oldest].in_use = 0;
return oldest;
}
static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
{
struct knet_host_defrag_buf *defrag_buf;
int defrag_buf_idx;
defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
if (defrag_buf_idx < 0) {
if (errno == ETIME) {
log_debug(knet_h, KNET_SUB_LINK_T, "Defrag buffer expired");
}
return 1;
}
defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
/*
* if the buf is not is use, then make sure it's clean
*/
if (!defrag_buf->in_use) {
memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
defrag_buf->in_use = 1;
defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
}
/*
* update timestamp on the buffer
*/
clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
/*
* check if we already received this fragment
*/
if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
/*
* if we have received this fragment and we didn't clear the buffer
* it means that we don't have all fragments yet
*/
return 1;
}
/*
* we need to handle the last packet with gloves due to its different size
*/
if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
defrag_buf->last_frag_size = *len;
/*
* in the event when the last packet arrives first,
* we still don't know the offset vs the other fragments (based on MTU),
* so we store the fragment at the end of the buffer where it's safe
* and take a copy of the len so that we can restore its offset later.
* remember we can't use the local MTU for this calculation because pMTU
* can be asymettric between the same hosts.
*/
if (!defrag_buf->frag_size) {
defrag_buf->last_first = 1;
memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
inbuf->khp_data_userdata,
*len);
}
} else {
defrag_buf->frag_size = *len;
}
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
inbuf->khp_data_userdata, *len);
defrag_buf->frag_recv++;
defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
/*
* check if we received all the fragments
*/
if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
/*
* special case the last pckt
*/
if (defrag_buf->last_first) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
defrag_buf->last_frag_size);
}
/*
* recalculate packet lenght
*/
*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
/*
* copy the pckt back in the user data
*/
memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
/*
* free this buffer
*/
defrag_buf->in_use = 0;
return 0;
}
return 1;
}
static void _parse_recv_from_links(knet_handle_t knet_h, struct sockaddr_storage *address, int ind, ssize_t len)
{
ssize_t outlen;
struct knet_host *src_host;
struct knet_link *src_link;
unsigned long long latency_last;
uint16_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
struct timespec recvtime;
struct knet_header *inbuf = knet_h->recv_from_links_buf[ind];
unsigned char *outbuf = (unsigned char *)knet_h->recv_from_links_buf[ind];
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[1];
int8_t channel;
if (knet_h->crypto_instance) {
if (crypto_authenticate_and_decrypt(knet_h,
(unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_decrypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_LINK_T, "Unable to decrypt/auth packet");
return;
}
len = outlen;
inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
}
if (len < (KNET_HEADER_SIZE + 1)) {
- log_debug(knet_h, KNET_SUB_LINK_T, "Packet is too short");
+ log_debug(knet_h, KNET_SUB_LINK_T, "Packet is too short: %ld", len);
return;
}
if (inbuf->kh_version != KNET_HEADER_VERSION) {
log_debug(knet_h, KNET_SUB_LINK_T, "Packet version does not match");
return;
}
inbuf->kh_node = ntohs(inbuf->kh_node);
src_host = knet_h->host_index[inbuf->kh_node];
if (src_host == NULL) { /* host not found */
log_debug(knet_h, KNET_SUB_LINK_T, "Unable to find source host for this packet");
return;
}
src_link = NULL;
if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
src_link = src_host->link +
(inbuf->khp_ping_link % KNET_MAX_LINK);
if (src_link->dynamic == KNET_LINK_DYNIP) {
if (memcmp(&src_link->dst_addr, address, sizeof(struct sockaddr_storage)) != 0) {
log_debug(knet_h, KNET_SUB_LINK_T, "host: %u link: %u appears to have changed ip address",
src_host->host_id, src_link->link_id);
memmove(&src_link->dst_addr, address, sizeof(struct sockaddr_storage));
if (getnameinfo((const struct sockaddr *)&src_link->dst_addr, sizeof(struct sockaddr_storage),
src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
src_link->status.dst_port, KNET_MAX_PORT_LEN,
NI_NUMERICHOST | NI_NUMERICSERV) != 0) {
log_debug(knet_h, KNET_SUB_LINK_T, "Unable to resolve ???");
snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
}
}
src_link->status.dynconnected = 1;
}
}
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_HOST_INFO:
case KNET_HEADER_TYPE_DATA:
inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
channel = inbuf->khp_data_channel;
if (!_seq_num_lookup(src_host, inbuf->khp_data_bcast, inbuf->khp_data_seq_num, 0)) {
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_LINK_T, "Packet has already been delivered");
}
return;
}
if (inbuf->khp_data_frag_num > 1) {
/*
* len as received from the socket also includes extra stuff
* that the defrag code doesn't care about. So strip it
* here and readd only for repadding once we are done
* defragging
*/
len = len - KNET_HEADER_DATA_SIZE;
if (pckt_defrag(knet_h, inbuf, &len)) {
return;
}
len = len + KNET_HEADER_DATA_SIZE;
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (knet_h->enabled != 1) /* data forward is disabled */
break;
if (knet_h->dst_host_filter_fn) {
int host_idx;
int found = 0;
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
&channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_LINK_T, "Error from dst_host_filter_fn: %d", bcast);
return;
}
if ((!bcast) && (!dst_host_ids_entries)) {
log_debug(knet_h, KNET_SUB_LINK_T, "Message is unicast but no dst_host_ids_entries");
return;
}
/* check if we are dst for this packet */
if (!bcast) {
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
if (!found) {
log_debug(knet_h, KNET_SUB_LINK_T, "Packet is not for us");
return;
}
}
}
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (!knet_h->sockfd[channel].in_use) {
log_debug(knet_h, KNET_SUB_LINK_T,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
memset(iov_out, 0, sizeof(iov_out));
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata;
iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE;
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
return;
}
if (outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, bcast, inbuf->khp_data_seq_num, 0);
}
} else { /* HOSTINFO */
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
bcast = 0;
knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
}
if (!_seq_num_lookup(src_host, bcast, inbuf->khp_data_seq_num, 0)) {
return;
}
_seq_num_set(src_host, bcast, inbuf->khp_data_seq_num, 0);
switch(knet_hostinfo->khi_type) {
case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
src_link = src_host->link +
(knet_hostinfo->khip_link_status_link_id % KNET_MAX_LINK);
/*
* basically if the node is coming back to life from a crash
* we should receive a host info where local previous status == remote current status
* and so we can detect that node is showing up again
* we need to clear cbuffers and notify the node of our status by resending our host info
*/
if ((src_link->remoteconnected == KNET_HOSTINFO_LINK_STATUS_UP) &&
(src_link->remoteconnected == knet_hostinfo->khip_link_status_status)) {
src_link->host_info_up_sent = 0;
}
src_link->remoteconnected = knet_hostinfo->khip_link_status_status;
if (src_link->remoteconnected == KNET_HOSTINFO_LINK_STATUS_DOWN) {
/*
* if a host is disconnecting clean, we note that in donnotremoteupdate
* so that we don't send host info back immediately but we wait
* for the node to send an update when it's alive again
*/
src_link->host_info_up_sent = 0;
src_link->donnotremoteupdate = 1;
} else {
src_link->donnotremoteupdate = 0;
}
log_debug(knet_h, KNET_SUB_LINK_T, "host message up/down. from host: %u link: %u remote connected: %u",
src_host->host_id,
src_link->link_id,
src_link->remoteconnected);
if (_host_dstcache_update_async(knet_h, src_host)) {
log_debug(knet_h, KNET_SUB_LINK_T,
"Unable to update switch cache for host: %u link: %u remote connected: %u)",
src_host->host_id,
src_link->link_id,
src_link->remoteconnected);
}
break;
case KNET_HOSTINFO_TYPE_LINK_TABLE:
break;
default:
log_warn(knet_h, KNET_SUB_LINK_T, "Receiving unknown host info message from host %u", src_host->host_id);
break;
}
}
break;
case KNET_HEADER_TYPE_PING:
outlen = KNET_HEADER_PING_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PONG;
inbuf->kh_node = htons(knet_h->host_id);
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_LINK_T, "Unable to encrypt pong packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
}
- if (sendto(src_link->listener_sock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
+ if (sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr,
sizeof(struct sockaddr_storage)) != outlen) {
log_debug(knet_h, KNET_SUB_LINK_T,
"Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
- src_link->listener_sock, errno, strerror(errno),
+ src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
}
break;
case KNET_HEADER_TYPE_PONG:
clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
timespec_diff(recvtime,
src_link->status.pong_last, &latency_last);
src_link->status.latency =
((src_link->status.latency * src_link->latency_exp) +
((latency_last / 1000llu) *
(src_link->latency_fix - src_link->latency_exp))) /
src_link->latency_fix;
if (src_link->status.latency < src_link->pong_timeout) {
if (!src_link->status.connected) {
if (src_link->received_pong >= src_link->pong_count) {
log_info(knet_h, KNET_SUB_LINK_T, "host: %u link: %u is up",
src_host->host_id, src_link->link_id);
_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
} else {
src_link->received_pong++;
log_debug(knet_h, KNET_SUB_LINK_T, "host: %u link: %u received pong: %u",
src_host->host_id, src_link->link_id, src_link->received_pong);
}
}
}
break;
case KNET_HEADER_TYPE_PMTUD:
outlen = KNET_HEADER_PMTUD_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
inbuf->kh_node = htons(knet_h->host_id);
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_LINK_T, "Unable to encrypt PMTUd reply packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
}
- if (sendto(src_link->listener_sock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
+ if (sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr,
sizeof(struct sockaddr_storage)) != outlen) {
log_debug(knet_h, KNET_SUB_LINK_T,
"Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
- src_link->listener_sock, errno, strerror(errno),
+ src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
}
break;
case KNET_HEADER_TYPE_PMTUD_REPLY:
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_LINK_T, "Unable to get mutex lock");
break;
}
src_link->last_recv_mtu = inbuf->khp_pmtud_size;
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
break;
default:
return;
}
}
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg)
{
int i, msg_recv;
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_LINK_T, "Unable to get read lock");
return;
}
msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
if (msg_recv < 0) {
log_err(knet_h, KNET_SUB_LINK_T, "No message received from recvmmsg: %s", strerror(errno));
goto exit_unlock;
}
+ if (msg_recv == 0) {
+ _close_socket(knet_h, sockfd);
+ }
+
for (i = 0; i < msg_recv; i++) {
- _parse_recv_from_links(knet_h, (struct sockaddr_storage *)&msg[i].msg_hdr.msg_name, i, msg[i].msg_len);
+ if (msg[i].msg_len == 0) {
+ _close_socket(knet_h, sockfd);
+ goto exit_unlock;
+ }
+ else {
+ _parse_recv_from_links(knet_h, (struct sockaddr_storage *)&msg[i].msg_hdr.msg_name, i, msg[i].msg_len);
+ }
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_FRAG_MAX];
struct mmsghdr msg[PCKT_FRAG_MAX];
struct iovec iov_in[PCKT_FRAG_MAX];
memset(&msg, 0, sizeof(struct mmsghdr));
for (i = 0; i < PCKT_FRAG_MAX; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
iov_in[i].iov_len = KNET_DATABUFSIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
}
}
return NULL;
}
diff --git a/libknet/transport_common.c b/libknet/transport_common.c
new file mode 100644
index 00000000..8455a476
--- /dev/null
+++ b/libknet/transport_common.c
@@ -0,0 +1,140 @@
+#include "config.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <pthread.h>
+#include <math.h>
+#include <sys/epoll.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <malloc.h>
+#include <arpa/inet.h>
+
+#include "libknet.h"
+#include "host.h"
+#include "link.h"
+#include "logging.h"
+#include "common.h"
+#include "transports.h"
+
+#ifdef DEBUG
+/*
+ * Keeping this light (and therefore not thread-safe) as it's
+ * for debugging only
+ */
+const char *_transport_print_ip(const struct sockaddr_storage *ss)
+{
+ static char printbuf[INET6_ADDRSTRLEN];
+
+ if (ss->ss_family == AF_INET) {
+ struct sockaddr_in *in4 = (struct sockaddr_in *)ss;
+ return inet_ntop(AF_INET, &in4->sin_addr, printbuf, sizeof(printbuf));
+ }
+ else {
+ struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)ss;
+ return inet_ntop(AF_INET6, &in6->sin6_addr, printbuf, sizeof(printbuf));
+ }
+}
+#else
+const char *_transport_print_ip(const struct sockaddr_storage *ss)
+{
+ return "node";
+}
+#endif
+
+int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type)
+{
+ int err = 0;
+ int value;
+ int savederrno;
+
+ value = KNET_RING_RCVBUFF;
+ if (setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value)) < 0) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_TRANSPORT_T, "Unable to set %s receive buffer: %s",
+ type, strerror(savederrno));
+ goto exit_error;
+ }
+
+ value = KNET_RING_RCVBUFF;
+ if (setsockopt(sock, SOL_SOCKET, SO_SNDBUFFORCE, &value, sizeof(value)) < 0) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_TRANSPORT_T, "Unable to set %s send buffer: %s",
+ type, strerror(savederrno));
+ goto exit_error;
+ }
+
+ value = 1;
+ if (setsockopt(sock, SOL_IP, IP_FREEBIND, &value, sizeof(value)) <0) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_TRANSPORT_T, "Unable to set FREEBIND on %s socket: %s",
+ type, strerror(savederrno));
+ goto exit_error;
+ }
+
+ if (address->ss_family == AF_INET6) {
+ value = 1;
+ if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY,
+ &value, sizeof(value)) < 0) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_TRANSPORT_T, "Unable to set %s IPv6 only: %s",
+ type, strerror(savederrno));
+ goto exit_error;
+
+ }
+ value = IPV6_PMTUDISC_PROBE;
+ if (setsockopt(sock, SOL_IPV6, IPV6_MTU_DISCOVER, &value, sizeof(value)) <0) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_TRANSPORT_T, "Unable to set PMTUDISC on %s socket: %s",
+ type, strerror(savederrno));
+ goto exit_error;
+ }
+ } else {
+ value = IP_PMTUDISC_PROBE;
+ if (setsockopt(sock, SOL_IP, IP_MTU_DISCOVER, &value, sizeof(value)) <0) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_TRANSPORT_T, "Unable to set PMTUDISC on %s socket: %s",
+ type, strerror(savederrno));
+ goto exit_error;
+ }
+ }
+
+ value = 1;
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) < 0) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_TRANSPORT_T, "Unable to set %s reuseaddr: %s",
+ type, strerror(savederrno));
+ goto exit_error;
+ }
+
+ if (_fdset_cloexec(sock)) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_TRANSPORT_T, "Unable to set %s CLOEXEC socket opts: %s",
+ type, strerror(savederrno));
+ goto exit_error;
+ }
+
+ if (_fdset_nonblock(sock)) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_TRANSPORT_T, "Unable to set %s NONBLOCK socket opts: %s",
+ type, strerror(savederrno));
+ goto exit_error;
+ }
+
+ err = 0;
+
+exit_error:
+ return err;
+}
diff --git a/libknet/transport_udp.c b/libknet/transport_udp.c
new file mode 100644
index 00000000..07e320b0
--- /dev/null
+++ b/libknet/transport_udp.c
@@ -0,0 +1,177 @@
+#include "config.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <pthread.h>
+#include <math.h>
+#include <sys/epoll.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <malloc.h>
+#include <qb/qblist.h>
+
+#include "libknet.h"
+#include "host.h"
+#include "link.h"
+#include "logging.h"
+#include "common.h"
+#include "transports.h"
+
+#define KNET_PMTUD_UDP_OVERHEAD 8
+
+typedef struct udp_handle_info {
+ knet_handle_t knet_h;
+ struct qb_list_head links_list;
+} udp_handle_info_t;
+
+typedef struct udp_link_info {
+ knet_transport_link_t transport;
+ knet_handle_t knet_handle;
+ int socket_fd;
+ udp_handle_info_t *handle_info;
+ struct qb_list_head list;
+ struct sockaddr_storage local_address;
+} udp_link_info_t;
+
+static int udp_handle_allocate(knet_handle_t knet_h, knet_transport_t *transport)
+{
+ udp_handle_info_t * handle_info;
+
+ handle_info = malloc(sizeof(udp_handle_info_t));
+ if (!handle_info) {
+ return -1;
+ }
+ handle_info->knet_h = knet_h;
+ qb_list_init(&handle_info->links_list);
+
+ *transport = handle_info;
+ return 0;
+}
+
+static int udp_handle_free(knet_handle_t knet_h, knet_transport_t transport)
+{
+ free(transport);
+ return 0;
+}
+
+static int udp_link_listener_start(knet_handle_t knet_h, knet_transport_link_t transport_link,
+ uint8_t link_id,
+ struct sockaddr_storage *address, struct sockaddr_storage *dst_address)
+{
+ return 0;
+}
+
+
+static int udp_link_allocate(knet_handle_t knet_h, knet_transport_t transport,
+ struct knet_link *link,
+ knet_transport_link_t *transport_link,
+ uint8_t link_id,
+ struct sockaddr_storage *address, struct sockaddr_storage *dst_address,
+ int *send_sock)
+{
+ int sock;
+ int savederrno;
+ struct epoll_event ev;
+ int err;
+ udp_link_info_t *info;
+ udp_handle_info_t *handle_info = transport;
+
+ /* Only allocate a new link if the local address is different */
+ qb_list_for_each_entry(info, &handle_info->links_list, list) {
+ if (memcmp(&info->local_address, address, sizeof(struct sockaddr_storage)) == 0) {
+
+ log_debug(knet_h, KNET_SUB_UDP_LINK_T, "Re-using existing UDP socket for new link");
+ *send_sock = info->socket_fd;
+ *transport_link = info;
+ return 0;
+ }
+ }
+
+ info = malloc(sizeof(udp_link_info_t));
+ if (!info) {
+ return -1;
+ }
+ info->knet_handle = knet_h;
+ info->handle_info = handle_info;
+
+ sock = socket(address->ss_family, SOCK_DGRAM, 0);
+ if (sock < 0) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_LISTENER, "Unable to create listener socket: %s",
+ strerror(savederrno));
+ goto exit_error;
+ }
+
+ if (_configure_transport_socket(knet_h, sock, dst_address, "UDP") < 0) {
+ err = -1;
+ goto exit_error;
+ }
+
+ if (bind(sock, (struct sockaddr *)address, sizeof(struct sockaddr_storage)) < 0) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_UDP_LINK_T, "Unable to bind listener socket: %s",
+ strerror(savederrno));
+ goto exit_error;
+ }
+
+ memset(&ev, 0, sizeof(struct epoll_event));
+ ev.events = EPOLLIN;
+ ev.data.fd = sock;
+
+ if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, sock, &ev)) {
+ savederrno = errno;
+ err = -1;
+ log_err(knet_h, KNET_SUB_UDP_LINK_T, "Unable to add listener to epoll pool: %s",
+ strerror(savederrno));
+ goto exit_error;
+ }
+
+ memcpy(&info->local_address, address, sizeof(struct sockaddr_storage));
+ qb_list_add(&info->list, &handle_info->links_list);
+ info->socket_fd = sock;
+ *transport_link = &info->transport;
+ *send_sock = sock;
+ return 0;
+
+exit_error:
+ return err;
+}
+
+static int udp_link_free(knet_transport_link_t transport)
+{
+ udp_link_info_t *info = transport;
+ udp_handle_info_t *handle_info = info->handle_info;
+
+ qb_list_del(&info->list);
+
+ if (qb_list_empty(&handle_info->links_list)) {
+ free(transport);
+ }
+ return 0;
+}
+
+static int udp_link_get_mtu_overhead(knet_transport_link_t transport)
+{
+ return KNET_PMTUD_UDP_OVERHEAD;
+}
+
+static knet_transport_ops_t udp_transport_ops = {
+ .handle_allocate = udp_handle_allocate,
+ .handle_free = udp_handle_free,
+
+ .link_allocate = udp_link_allocate,
+ .link_listener_start = udp_link_listener_start,
+ .link_free = udp_link_free,
+ .link_get_mtu_overhead = udp_link_get_mtu_overhead,
+ .transport_name = "UDP",
+};
+
+knet_transport_ops_t *get_udp_transport()
+{
+ return &udp_transport_ops;
+}
diff --git a/libknet/transports.h b/libknet/transports.h
new file mode 100644
index 00000000..5a0cbd03
--- /dev/null
+++ b/libknet/transports.h
@@ -0,0 +1,8 @@
+#include <netinet/in.h>
+#include <netinet/sctp.h>
+
+knet_transport_ops_t *get_udp_transport(void);
+
+const char *_transport_print_ip(const struct sockaddr_storage *ss);
+int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type);
+

File Metadata

Mime Type
text/x-diff
Expires
Wed, Jun 25, 4:58 AM (1 d, 19 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1952218
Default Alt Text
(188 KB)

Event Timeline