Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3153874
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
123 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/libknet/Makefile.am b/libknet/Makefile.am
index 6d8e6899..c570e47f 100644
--- a/libknet/Makefile.am
+++ b/libknet/Makefile.am
@@ -1,96 +1,96 @@
#
# Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
#
# Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
# Federico Simoncelli <fsimon@kronosnet.org>
#
# This software licensed under GPL-2.0+, LGPL-2.0+
#
MAINTAINERCLEANFILES = Makefile.in
include $(top_srcdir)/build-aux/check.mk
SYMFILE = libknet_exported_syms
EXTRA_DIST = $(SYMFILE)
SUBDIRS = . tests
libversion = 0:0:0
# override global LIBS that pulls in lots of craft we don't need here
LIBS =
sources = \
common.c \
compat.c \
compress.c \
compress_zlib.c \
compress_lz4.c \
compress_lzo2.c \
compress_lzma.c \
compress_bzip2.c \
crypto.c \
crypto_nss.c \
handle.c \
host.c \
- link.c \
+ links.c \
logging.c \
netutils.c \
threads_common.c \
threads_dsthandler.c \
threads_heartbeat.c \
threads_pmtud.c \
threads_rx.c \
threads_tx.c \
transport_udp.c \
transport_sctp.c \
transport_loopback.c \
transport_common.c
include_HEADERS = libknet.h
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libknet.pc
noinst_HEADERS = \
common.h \
compat.h \
compress.h \
compress_zlib.h \
compress_lz4.h \
compress_lzo2.h \
compress_lzma.h \
compress_bzip2.h \
crypto.h \
crypto_nss.h \
host.h \
internals.h \
- link.h \
+ links.h \
logging.h \
netutils.h \
onwire.h \
threads_common.h \
threads_dsthandler.h \
threads_heartbeat.h \
threads_pmtud.h \
threads_rx.h \
threads_tx.h \
transports.h
lib_LTLIBRARIES = libknet.la
libknet_la_SOURCES = $(sources)
libknet_la_CFLAGS = $(nss_CFLAGS) \
$(zlib_CFLAGS) $(liblz4_CFLAGS) $(lzo2_CFLAGS) $(liblzma_CFLAGS) $(bzip2_CFLAGS)
EXTRA_libknet_la_DEPENDENCIES = $(SYMFILE)
libknet_la_LDFLAGS = -Wl,--version-script=$(srcdir)/$(SYMFILE) \
--export-dynamic \
-version-number $(libversion)
libknet_la_LIBADD = $(dl_LIBS) $(pthread_LIBS) $(rt_LIBS) $(m_LIBS)
diff --git a/libknet/link.c b/libknet/links.c
similarity index 99%
rename from libknet/link.c
rename to libknet/links.c
index e7e6bfd0..8cee9185 100644
--- a/libknet/link.c
+++ b/libknet/links.c
@@ -1,1068 +1,1068 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <netdb.h>
#include <string.h>
#include <pthread.h>
#include "internals.h"
#include "logging.h"
-#include "link.h"
+#include "links.h"
#include "transports.h"
#include "host.h"
int _link_updown(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int enabled, unsigned int connected)
{
struct knet_link *link = &knet_h->host_index[host_id]->link[link_id];
if ((link->status.enabled == enabled) &&
(link->status.connected == connected))
return 0;
link->status.enabled = enabled;
link->status.connected = connected;
_host_dstcache_update_async(knet_h, knet_h->host_index[host_id]);
if ((link->status.dynconnected) &&
(!link->status.connected))
link->status.dynconnected = 0;
if (connected) {
time(&link->status.stats.last_up_times[link->status.stats.last_up_time_index]);
link->status.stats.up_count++;
if (++link->status.stats.last_up_time_index > MAX_LINK_EVENTS) {
link->status.stats.last_up_time_index = 0;
}
} else {
time(&link->status.stats.last_down_times[link->status.stats.last_down_time_index]);
link->status.stats.down_count++;
if (++link->status.stats.last_down_time_index > MAX_LINK_EVENTS) {
link->status.stats.last_down_time_index = 0;
}
}
return 0;
}
int knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t transport,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr,
uint64_t flags)
{
int savederrno = 0, err = 0, i;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!src_addr) {
errno = EINVAL;
return -1;
}
if (transport >= KNET_MAX_TRANSPORTS) {
errno = EINVAL;
return -1;
}
if (!knet_h->transport_ops[transport]) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id != host_id) {
log_err(knet_h, KNET_SUB_LINK, "Cannot create loopback link to remote node");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
if (knet_h->host_id == host_id && knet_h->has_loop_link) {
log_err(knet_h, KNET_SUB_LINK, "Cannot create more than 1 link when loopback is active");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id == host_id) {
for (i=0; i<KNET_MAX_LINK; i++) {
if (host->link[i].configured) {
log_err(knet_h, KNET_SUB_LINK, "Cannot add loopback link when other links are already configured.");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
}
}
link = &host->link[link_id];
if (link->configured != 0) {
err =-1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled != 0) {
err =-1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
memmove(&link->src_addr, src_addr, sizeof(struct sockaddr_storage));
err = knet_addrtostr(src_addr, sizeof(struct sockaddr_storage),
link->status.src_ipaddr, KNET_MAX_HOST_LEN,
link->status.src_port, KNET_MAX_PORT_LEN);
if (err) {
if (err == EAI_SYSTEM) {
savederrno = errno;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u source addr/port: %s",
host_id, link_id, strerror(savederrno));
} else {
savederrno = EINVAL;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u source addr/port: %s",
host_id, link_id, gai_strerror(err));
}
err = -1;
goto exit_unlock;
}
if (!dst_addr) {
link->dynamic = KNET_LINK_DYNIP;
} else {
link->dynamic = KNET_LINK_STATIC;
memmove(&link->dst_addr, dst_addr, sizeof(struct sockaddr_storage));
err = knet_addrtostr(dst_addr, sizeof(struct sockaddr_storage),
link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
link->status.dst_port, KNET_MAX_PORT_LEN);
if (err) {
if (err == EAI_SYSTEM) {
savederrno = errno;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u destination addr/port: %s",
host_id, link_id, strerror(savederrno));
} else {
savederrno = EINVAL;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u destination addr/port: %s",
host_id, link_id, gai_strerror(err));
}
err = -1;
goto exit_unlock;
}
}
link->transport_type = transport;
link->transport_connected = 0;
link->proto_overhead = knet_h->transport_ops[link->transport_type]->transport_mtu_overhead;
link->configured = 1;
link->pong_count = KNET_LINK_DEFAULT_PONG_COUNT;
link->has_valid_mtu = 0;
link->ping_interval = KNET_LINK_DEFAULT_PING_INTERVAL * 1000; /* microseconds */
link->pong_timeout = KNET_LINK_DEFAULT_PING_TIMEOUT * 1000; /* microseconds */
link->latency_fix = KNET_LINK_DEFAULT_PING_PRECISION;
link->latency_exp = KNET_LINK_DEFAULT_PING_PRECISION - \
((link->ping_interval * KNET_LINK_DEFAULT_PING_PRECISION) / 8000000);
link->flags = flags;
if (knet_h->transport_ops[link->transport_type]->transport_link_set_config(knet_h, link) < 0) {
savederrno = errno;
err = -1;
goto exit_unlock;
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is configured",
host_id, link_id);
if (transport == KNET_TRANSPORT_LOOPBACK) {
knet_h->has_loop_link = 1;
knet_h->loop_link = link_id;
host->status.reachable = 1;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *transport,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr,
uint8_t *dynamic,
uint64_t *flags)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!src_addr) {
errno = EINVAL;
return -1;
}
if (!dynamic) {
errno = EINVAL;
return -1;
}
if (!transport) {
errno = EINVAL;
return -1;
}
if (!flags) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if ((link->dynamic == KNET_LINK_STATIC) && (!dst_addr)) {
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
memmove(src_addr, &link->src_addr, sizeof(struct sockaddr_storage));
*transport = link->transport_type;
*flags = link->flags;
if (link->dynamic == KNET_LINK_STATIC) {
*dynamic = 0;
memmove(dst_addr, &link->dst_addr, sizeof(struct sockaddr_storage));
} else {
*dynamic = 1;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_clear_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (link->configured != 1) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled != 0) {
err = -1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if ((knet_h->transport_ops[link->transport_type]->transport_link_clear_config(knet_h, link) < 0) &&
(errno != EBUSY)) {
savederrno = errno;
err = -1;
goto exit_unlock;
}
memset(link, 0, sizeof(struct knet_link));
link->link_id = link_id;
if (knet_h->has_loop_link && link_id == knet_h->loop_link) {
knet_h->has_loop_link = 0;
if (host->active_link_entries == 0) {
host->status.reachable = 0;
}
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u config has been wiped",
host_id, link_id);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_set_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int enabled)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (enabled > 1) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled == enabled) {
err = 0;
goto exit_unlock;
}
err = _link_updown(knet_h, host_id, link_id, enabled, link->status.connected);
savederrno = errno;
if (enabled) {
goto exit_unlock;
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is disabled",
host_id, link_id);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int *enabled)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!enabled) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*enabled = link->status.enabled;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_set_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t pong_count)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (pong_count < 1) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
link->pong_count = pong_count;
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u pong count update: %u",
host_id, link_id, link->pong_count);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *pong_count)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!pong_count) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*pong_count = link->pong_count;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_set_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
time_t interval, time_t timeout, unsigned int precision)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
if (!timeout) {
errno = EINVAL;
return -1;
}
if (!precision) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
link->ping_interval = interval * 1000; /* microseconds */
link->pong_timeout = timeout * 1000; /* microseconds */
link->latency_fix = precision;
link->latency_exp = precision - \
((link->ping_interval * precision) / 8000000);
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u timeout update - interval: %llu timeout: %llu precision: %u",
host_id, link_id, link->ping_interval, link->pong_timeout, precision);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
time_t *interval, time_t *timeout, unsigned int *precision)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
if (!timeout) {
errno = EINVAL;
return -1;
}
if (!precision) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*interval = link->ping_interval / 1000; /* microseconds */
*timeout = link->pong_timeout / 1000;
*precision = link->latency_fix;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_set_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t priority)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
uint8_t old_priority;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
old_priority = link->priority;
if (link->priority == priority) {
err = 0;
goto exit_unlock;
}
link->priority = priority;
if (_host_dstcache_update_sync(knet_h, host)) {
savederrno = errno;
log_debug(knet_h, KNET_SUB_LINK,
"Unable to update link priority (host: %u link: %u priority: %u): %s",
host_id, link_id, link->priority, strerror(savederrno));
link->priority = old_priority;
err = -1;
goto exit_unlock;
}
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u priority set to: %u",
host_id, link_id, link->priority);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *priority)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!priority) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*priority = link->priority;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_link_list(knet_handle_t knet_h, knet_node_id_t host_id,
uint8_t *link_ids, size_t *link_ids_entries)
{
int savederrno = 0, err = 0, i, count = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!link_ids) {
errno = EINVAL;
return -1;
}
if (!link_ids_entries) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
for (i = 0; i < KNET_MAX_LINK; i++) {
link = &host->link[i];
if (!link->configured) {
continue;
}
link_ids[count] = i;
count++;
}
*link_ids_entries = count;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_status(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
struct knet_link_status *status, size_t struct_size)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!status) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
memmove(status, &link->status, struct_size);
/* Calculate totals - no point in doing this on-the-fly */
status->stats.rx_total_packets =
status->stats.rx_data_packets +
status->stats.rx_ping_packets +
status->stats.rx_pong_packets +
status->stats.rx_pmtu_packets;
status->stats.tx_total_packets =
status->stats.tx_data_packets +
status->stats.tx_ping_packets +
status->stats.tx_pong_packets +
status->stats.tx_pmtu_packets;
status->stats.rx_total_bytes =
status->stats.rx_data_bytes +
status->stats.rx_ping_bytes +
status->stats.rx_pong_bytes +
status->stats.rx_pmtu_bytes;
status->stats.tx_total_bytes =
status->stats.tx_data_bytes +
status->stats.tx_ping_bytes +
status->stats.tx_pong_bytes +
status->stats.tx_pmtu_bytes;
status->stats.tx_total_errors =
status->stats.tx_data_errors +
status->stats.tx_ping_errors +
status->stats.tx_pong_errors +
status->stats.tx_pmtu_errors;
status->stats.tx_total_retries =
status->stats.tx_data_retries +
status->stats.tx_ping_retries +
status->stats.tx_pong_retries +
status->stats.tx_pmtu_retries;
/* Tell the caller our full size in case they have an old version */
status->size = sizeof(struct knet_link_status);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
diff --git a/libknet/link.h b/libknet/links.h
similarity index 100%
rename from libknet/link.h
rename to libknet/links.h
diff --git a/libknet/tests/api_knet_link_set_config.c b/libknet/tests/api_knet_link_set_config.c
index 4785f38d..2334d696 100644
--- a/libknet/tests/api_knet_link_set_config.c
+++ b/libknet/tests/api_knet_link_set_config.c
@@ -1,271 +1,271 @@
/*
* Copyright (C) 2016 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "libknet.h"
#include "internals.h"
-#include "link.h"
+#include "links.h"
#include "netutils.h"
#include "test-common.h"
static void test(void)
{
knet_handle_t knet_h;
int logfds[2];
struct sockaddr_storage src, dst;
struct knet_link_status link_status;
memset(&src, 0, sizeof(struct sockaddr_storage));
if (knet_strtoaddr("127.0.0.1", "50000", &src, sizeof(struct sockaddr_storage)) < 0) {
printf("Unable to convert src to sockaddr: %s\n", strerror(errno));
exit(FAIL);
}
memset(&dst, 0, sizeof(struct sockaddr_storage));
if (knet_strtoaddr("127.0.0.1", "50001", &dst, sizeof(struct sockaddr_storage)) < 0) {
printf("Unable to convert dst to sockaddr: %s\n", strerror(errno));
exit(FAIL);
}
printf("Test knet_link_set_config incorrect knet_h\n");
if ((!knet_link_set_config(NULL, 1, 0, KNET_TRANSPORT_UDP, &src, &dst, 0)) || (errno != EINVAL)) {
printf("knet_link_set_config accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno));
exit(FAIL);
}
setup_logpipes(logfds);
knet_h = knet_handle_new(1, logfds[1], KNET_LOG_DEBUG);
if (!knet_h) {
printf("knet_handle_new failed: %s\n", strerror(errno));
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
printf("Test knet_link_set_config with unconfigured host_id\n");
if ((!knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &src, &dst, 0)) || (errno != EINVAL)) {
printf("knet_link_set_config accepted invalid host_id or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
printf("Test knet_link_set_config with bad transport type\n");
if ((!knet_link_set_config(knet_h, 1, 0, KNET_MAX_TRANSPORTS+1, &src, &dst, 0)) || (errno != EINVAL)) {
printf("knet_link_set_config accepted invalid transport or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_set_config with incorrect linkid\n");
if (knet_host_add(knet_h, 1) < 0) {
printf("Unable to add host_id 1: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((!knet_link_set_config(knet_h, 1, KNET_MAX_LINK, KNET_TRANSPORT_UDP, &src, &dst, 0)) || (errno != EINVAL)) {
printf("knet_link_set_config accepted invalid linkid or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_set_config with incorrect src_addr\n");
if ((!knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, NULL, &dst, 0)) || (errno != EINVAL)) {
printf("knet_link_set_config accepted invalid src_addr or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_set_config with dynamic dst_addr\n");
if (knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &src, NULL, 0) < 0) {
printf("Unable to configure link: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_get_status(knet_h, 1, 0, &link_status, sizeof(struct knet_link_status)) < 0) {
printf("Unable to get link status: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((link_status.enabled != 0) ||
(strcmp(link_status.src_ipaddr, "127.0.0.1")) ||
(strcmp(link_status.src_port, "50000")) ||
(knet_h->host_index[1]->link[0].dynamic != KNET_LINK_DYNIP)) {
printf("knet_link_set_config failed to set configuration. enabled: %d src_addr %s src_port %s dynamic %u\n",
link_status.enabled, link_status.src_ipaddr, link_status.src_port, knet_h->host_index[1]->link[0].dynamic);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_set_config with already configured link\n");
if ((!knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &src, NULL, 0) || (errno != EBUSY))) {
printf("knet_link_set_config accepted request while link configured or returned incorrect error: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
printf("Test knet_link_set_config with link enabled\n");
if (knet_link_set_enable(knet_h, 1, 0, 1) < 0) {
printf("Unable to enable link: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_get_status(knet_h, 1, 0, &link_status, sizeof(struct knet_link_status)) < 0) {
printf("Unable to get link status: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((!knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &src, NULL, 0)) || (errno != EBUSY)) {
printf("knet_link_set_config accepted request while link enabled or returned incorrect error: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_set_enable(knet_h, 1, 0, 0) < 0) {
printf("Unable to disable link: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_clear_config(knet_h, 1, 0) < 0) {
printf("Unable to clear link config: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
printf("Test knet_link_set_config with static dst_addr\n");
if (knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &src, &dst, 0) < 0) {
printf("Unable to configure link: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_get_status(knet_h, 1, 0, &link_status, sizeof(struct knet_link_status)) < 0) {
printf("Unable to get link status: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((link_status.enabled != 0) ||
(strcmp(link_status.src_ipaddr, "127.0.0.1")) ||
(strcmp(link_status.src_port, "50000")) ||
(strcmp(link_status.dst_ipaddr, "127.0.0.1")) ||
(strcmp(link_status.dst_port, "50001")) ||
(knet_h->host_index[1]->link[0].dynamic != KNET_LINK_STATIC)) {
printf("knet_link_set_config failed to set configuration. enabled: %d src_addr %s src_port %s dst_addr %s dst_port %s dynamic %u\n",
link_status.enabled, link_status.src_ipaddr, link_status.src_port, link_status.dst_ipaddr, link_status.dst_port, knet_h->host_index[1]->link[0].dynamic);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
}
int main(int argc, char *argv[])
{
need_root();
test();
return PASS;
}
diff --git a/libknet/threads_heartbeat.c b/libknet/threads_heartbeat.c
index e29c0a0b..d16e5580 100644
--- a/libknet/threads_heartbeat.c
+++ b/libknet/threads_heartbeat.c
@@ -1,173 +1,173 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include "crypto.h"
-#include "link.h"
+#include "links.h"
#include "logging.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
static void _link_down(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link)
{
memset(&dst_link->pmtud_last, 0, sizeof(struct timespec));
dst_link->received_pong = 0;
dst_link->status.pong_last.tv_nsec = 0;
if (dst_link->status.connected == 1) {
log_info(knet_h, KNET_SUB_LINK, "host: %u link: %u is down",
dst_host->host_id, dst_link->link_id);
_link_updown(knet_h, dst_host->host_id, dst_link->link_id, dst_link->status.enabled, 0);
}
}
static void _handle_check_each(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int timed)
{
int err = 0, savederrno = 0;
int len;
ssize_t outlen = KNET_HEADER_PING_SIZE;
struct timespec clock_now, pong_last;
unsigned long long diff_ping;
unsigned char *outbuf = (unsigned char *)knet_h->pingbuf;
if (dst_link->transport_connected == 0) {
_link_down(knet_h, dst_host, dst_link);
return;
}
/* caching last pong to avoid race conditions */
pong_last = dst_link->status.pong_last;
if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get monotonic clock");
return;
}
timespec_diff(dst_link->ping_last, clock_now, &diff_ping);
if ((diff_ping >= (dst_link->ping_interval * 1000llu)) || (!timed)) {
memmove(&knet_h->pingbuf->khp_ping_time[0], &clock_now, sizeof(struct timespec));
knet_h->pingbuf->khp_ping_link = dst_link->link_id;
if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get seq mutex lock");
return;
}
knet_h->pingbuf->khp_ping_seq_num = htons(knet_h->tx_seq_num);
pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
knet_h->pingbuf->khp_ping_timed = timed;
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)knet_h->pingbuf,
outlen,
knet_h->pingbuf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to crypto ping packet");
return;
}
outbuf = knet_h->pingbuf_crypt;
}
retry:
len = sendto(dst_link->outsock, outbuf, outlen,
MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &dst_link->dst_addr,
sizeof(struct sockaddr_storage));
savederrno = errno;
dst_link->ping_last = clock_now;
dst_link->status.stats.tx_ping_packets++;
dst_link->status.stats.tx_ping_bytes += outlen;
if (len != outlen) {
err = knet_h->transport_ops[dst_link->transport_type]->transport_tx_sock_error(knet_h, dst_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_HEARTBEAT,
"Unable to send ping (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
dst_link->outsock, savederrno, strerror(savederrno),
dst_link->status.src_ipaddr, dst_link->status.src_port,
dst_link->status.dst_ipaddr, dst_link->status.dst_port);
dst_link->status.stats.tx_ping_errors++;
break;
case 0:
break;
case 1:
dst_link->status.stats.tx_ping_retries++;
goto retry;
break;
}
} else {
dst_link->last_ping_size = outlen;
}
}
timespec_diff(pong_last, clock_now, &diff_ping);
if ((pong_last.tv_nsec) &&
(diff_ping >= (dst_link->pong_timeout * 1000llu))) {
_link_down(knet_h, dst_host, dst_link);
}
}
void _send_pings(knet_handle_t knet_h, int timed)
{
struct knet_host *dst_host;
int link_idx;
if (pthread_mutex_lock(&knet_h->hb_mutex)) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get hb mutex lock");
return;
}
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if ((dst_host->link[link_idx].status.enabled != 1) ||
(dst_host->link[link_idx].transport_type == KNET_TRANSPORT_LOOPBACK ) ||
((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
(dst_host->link[link_idx].status.dynconnected != 1)))
continue;
_handle_check_each(knet_h, dst_host, &dst_host->link[link_idx], timed);
}
}
pthread_mutex_unlock(&knet_h->hb_mutex);
}
void *_handle_heartbt_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
/* preparing ping buffer */
knet_h->pingbuf->kh_version = KNET_HEADER_VERSION;
knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING;
knet_h->pingbuf->kh_node = htons(knet_h->host_id);
while (!shutdown_in_progress(knet_h)) {
usleep(KNET_THREADS_TIMERES);
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get read lock");
continue;
}
_send_pings(knet_h, 1);
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
return NULL;
}
diff --git a/libknet/threads_pmtud.c b/libknet/threads_pmtud.c
index 13f9454a..999597a9 100644
--- a/libknet/threads_pmtud.c
+++ b/libknet/threads_pmtud.c
@@ -1,409 +1,409 @@
/*
* Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include "crypto.h"
-#include "link.h"
+#include "links.h"
#include "host.h"
#include "logging.h"
#include "threads_common.h"
#include "threads_pmtud.h"
static int _handle_check_link_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link)
{
int err, ret, savederrno, mutex_retry_limit, failsafe;
ssize_t onwire_len; /* current packet onwire size */
ssize_t overhead_len; /* onwire packet overhead (protocol based) */
ssize_t max_mtu_len; /* max mtu for protocol */
ssize_t data_len; /* how much data we can send in the packet
* generally would be onwire_len - overhead_len
* needs to be adjusted for crypto
*/
ssize_t pad_len; /* crypto packet pad size, needs to move into crypto.c callbacks */
int len; /* len of what we were able to sendto onwire */
struct timespec ts;
unsigned char *outbuf = (unsigned char *)knet_h->pmtudbuf;
mutex_retry_limit = 0;
failsafe = 0;
pad_len = 0;
dst_link->last_bad_mtu = 0;
knet_h->pmtudbuf->khp_pmtud_link = dst_link->link_id;
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
max_mtu_len = KNET_PMTUD_SIZE_V6;
overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead;
dst_link->last_good_mtu = dst_link->last_ping_size + overhead_len;
break;
case AF_INET:
max_mtu_len = KNET_PMTUD_SIZE_V4;
overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead;
dst_link->last_good_mtu = dst_link->last_ping_size + overhead_len;
break;
default:
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unknown protocol");
return -1;
break;
}
/*
* discovery starts from the top because kernel will
* refuse to send packets > current iface mtu.
* this saves us some time and network bw.
*/
onwire_len = max_mtu_len;
restart:
/*
* prevent a race when interface mtu is changed _exactly_ during
* the discovery process and it's complex to detect. Easier
* to wait the next loop.
* 30 is not an arbitrary value. To bisect from 576 to 128000 doesn't
* take more than 18/19 steps.
*/
if (failsafe == 30) {
log_err(knet_h, KNET_SUB_PMTUD,
"Aborting PMTUD process: Too many attempts. MTU might have changed during discovery.");
return -1;
} else {
failsafe++;
}
data_len = onwire_len - overhead_len;
if (knet_h->crypto_instance) {
if (knet_h->sec_block_size) {
pad_len = knet_h->sec_block_size - (data_len % knet_h->sec_block_size);
if (pad_len == knet_h->sec_block_size) {
pad_len = 0;
}
data_len = data_len + pad_len;
}
data_len = data_len + (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size);
if (knet_h->sec_block_size) {
while (data_len + overhead_len >= max_mtu_len) {
data_len = data_len - knet_h->sec_block_size;
}
}
if (dst_link->last_bad_mtu) {
while (data_len + overhead_len >= dst_link->last_bad_mtu) {
data_len = data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size);
}
}
if (data_len < (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size) + 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: link mtu smaller than crypto header detected (link might have been disconnected)");
return -1;
}
onwire_len = data_len + overhead_len;
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)knet_h->pmtudbuf,
data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size),
knet_h->pmtudbuf_crypt,
&data_len) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to crypto pmtud packet");
return -1;
}
outbuf = knet_h->pmtudbuf_crypt;
} else {
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
}
/* link has gone down, aborting pmtud */
if (dst_link->status.connected != 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id);
return -1;
}
if (dst_link->transport_connected != 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id);
return -1;
}
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
return -1;
}
retry:
len = sendto(dst_link->outsock, outbuf, data_len,
MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &dst_link->dst_addr,
sizeof(struct sockaddr_storage));
savederrno = errno;
err = knet_h->transport_ops[dst_link->transport_type]->transport_tx_sock_error(knet_h, dst_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet (sendto): %d %s", savederrno, strerror(savederrno));
pthread_mutex_unlock(&knet_h->pmtud_mutex);
dst_link->status.stats.tx_pmtu_errors++;
return -1;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
dst_link->status.stats.tx_pmtu_retries++;
goto retry;
break;
}
if (len != data_len) {
if (savederrno == EMSGSIZE) {
dst_link->last_bad_mtu = onwire_len;
} else {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet len: %zd err: %s", onwire_len, strerror(savederrno));
}
} else {
dst_link->last_sent_mtu = onwire_len;
dst_link->last_recv_mtu = 0;
dst_link->status.stats.tx_pmtu_packets++;
dst_link->status.stats.tx_pmtu_bytes += data_len;
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return -1;
}
/*
* Set an artibrary 2 seconds timeout to receive a PMTUd reply
* perhaps this should be configurable but:
* 1) too short timeout can cause instability since MTU value
* influeces link status
* 2) too high timeout slows down the MTU detection process for
* small MTU
*
* Another option is to make the PMTUd process less influent
* in link status detection but that could cause data packet loss
* without link up/down changes
*/
ts.tv_sec += 2;
ret = pthread_cond_timedwait(&knet_h->pmtud_cond, &knet_h->pmtud_mutex, &ts);
if (shutdown_in_progress(knet_h)) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted. shutdown in progress");
return -1;
}
if ((ret != 0) && (ret != ETIMEDOUT)) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
if (mutex_retry_limit == 3) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unable to get mutex lock");
return -1;
}
mutex_retry_limit++;
goto restart;
}
if ((dst_link->last_recv_mtu != onwire_len) || (ret)) {
dst_link->last_bad_mtu = onwire_len;
} else {
int found_mtu = 0;
if (knet_h->sec_block_size) {
if ((onwire_len + knet_h->sec_block_size >= max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu <= (onwire_len + knet_h->sec_block_size)))) {
found_mtu = 1;
}
} else {
if ((onwire_len == max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu == (onwire_len + 1)))) {
found_mtu = 1;
}
}
if (found_mtu) {
/*
* account for IP overhead, knet headers and crypto in PMTU calculation
*/
dst_link->status.mtu = onwire_len - dst_link->status.proto_overhead;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return 0;
}
dst_link->last_good_mtu = onwire_len;
}
}
onwire_len = (dst_link->last_good_mtu + dst_link->last_bad_mtu) / 2;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
goto restart;
}
static int _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, unsigned int *min_mtu)
{
uint8_t saved_valid_pmtud;
unsigned int saved_pmtud;
struct timespec clock_now;
unsigned long long diff_pmtud, interval;
interval = knet_h->pmtud_interval * 1000000000llu; /* nanoseconds */
if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get monotonic clock");
return 0;
}
timespec_diff(dst_link->pmtud_last, clock_now, &diff_pmtud);
if (diff_pmtud < interval) {
*min_mtu = dst_link->status.mtu;
return dst_link->has_valid_mtu;
}
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_header_size;
break;
case AF_INET:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_header_size;
break;
}
saved_pmtud = dst_link->status.mtu;
saved_valid_pmtud = dst_link->has_valid_mtu;
log_debug(knet_h, KNET_SUB_PMTUD, "Starting PMTUD for host: %u link: %u", dst_host->host_id, dst_link->link_id);
if (_handle_check_link_pmtud(knet_h, dst_host, dst_link) < 0) {
dst_link->has_valid_mtu = 0;
} else {
dst_link->has_valid_mtu = 1;
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
if (((dst_link->status.mtu + dst_link->status.proto_overhead) < KNET_PMTUD_MIN_MTU_V6) ||
((dst_link->status.mtu + dst_link->status.proto_overhead) > KNET_PMTUD_SIZE_V6)) {
log_debug(knet_h, KNET_SUB_PMTUD,
"PMTUD detected an IPv6 MTU out of bound value (%u) for host: %u link: %u.",
dst_link->status.mtu + dst_link->status.proto_overhead, dst_host->host_id, dst_link->link_id);
dst_link->has_valid_mtu = 0;
}
break;
case AF_INET:
if (((dst_link->status.mtu + dst_link->status.proto_overhead) < KNET_PMTUD_MIN_MTU_V4) ||
((dst_link->status.mtu + dst_link->status.proto_overhead) > KNET_PMTUD_SIZE_V4)) {
log_debug(knet_h, KNET_SUB_PMTUD,
"PMTUD detected an IPv4 MTU out of bound value (%u) for host: %u link: %u.",
dst_link->status.mtu + dst_link->status.proto_overhead, dst_host->host_id, dst_link->link_id);
dst_link->has_valid_mtu = 0;
}
break;
}
if (dst_link->has_valid_mtu) {
if ((saved_pmtud) && (saved_pmtud != dst_link->status.mtu)) {
log_info(knet_h, KNET_SUB_PMTUD, "PMTUD link change for host: %u link: %u from %u to %u",
dst_host->host_id, dst_link->link_id, saved_pmtud, dst_link->status.mtu);
}
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD completed for host: %u link: %u current link mtu: %u",
dst_host->host_id, dst_link->link_id, dst_link->status.mtu);
if (dst_link->status.mtu < *min_mtu) {
*min_mtu = dst_link->status.mtu;
}
dst_link->pmtud_last = clock_now;
}
}
if (saved_valid_pmtud != dst_link->has_valid_mtu) {
_host_dstcache_update_sync(knet_h, dst_host);
}
return dst_link->has_valid_mtu;
}
void *_handle_pmtud_link_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct knet_host *dst_host;
struct knet_link *dst_link;
int link_idx;
unsigned int min_mtu, have_mtu;
unsigned int lower_mtu;
knet_h->data_mtu = KNET_PMTUD_MIN_MTU_V4 - KNET_HEADER_ALL_SIZE - knet_h->sec_header_size;
/* preparing pmtu buffer */
knet_h->pmtudbuf->kh_version = KNET_HEADER_VERSION;
knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD;
knet_h->pmtudbuf->kh_node = htons(knet_h->host_id);
while (!shutdown_in_progress(knet_h)) {
usleep(KNET_THREADS_TIMERES);
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get read lock");
continue;
}
lower_mtu = KNET_PMTUD_SIZE_V4;
min_mtu = KNET_PMTUD_SIZE_V4 - KNET_HEADER_ALL_SIZE - knet_h->sec_header_size;
have_mtu = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
dst_link = &dst_host->link[link_idx];
if ((dst_link->status.enabled != 1) ||
(dst_link->status.connected != 1) ||
(dst_host->link[link_idx].transport_type == KNET_TRANSPORT_LOOPBACK) ||
(!dst_link->last_ping_size) ||
((dst_link->dynamic == KNET_LINK_DYNIP) &&
(dst_link->status.dynconnected != 1)))
continue;
if (_handle_check_pmtud(knet_h, dst_host, dst_link, &min_mtu)) {
have_mtu = 1;
if (min_mtu < lower_mtu) {
lower_mtu = min_mtu;
}
}
}
}
if (have_mtu) {
if (knet_h->data_mtu != lower_mtu) {
knet_h->data_mtu = lower_mtu;
log_info(knet_h, KNET_SUB_PMTUD, "Global data MTU changed to: %u", knet_h->data_mtu);
if (knet_h->pmtud_notify_fn) {
knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data,
knet_h->data_mtu);
}
}
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
return NULL;
}
diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
index f6923e8a..85586184 100644
--- a/libknet/threads_rx.c
+++ b/libknet/threads_rx.c
@@ -1,820 +1,820 @@
/*
* Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/uio.h>
#include <pthread.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
-#include "link.h"
+#include "links.h"
#include "logging.h"
#include "transports.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_rx.h"
#include "netutils.h"
/*
* RECV
*/
/*
* return 1 if a > b
* return -1 if b > a
* return 0 if they are equal
*/
static inline int timecmp(struct timespec a, struct timespec b)
{
if (a.tv_sec != b.tv_sec) {
if (a.tv_sec > b.tv_sec) {
return 1;
} else {
return -1;
}
} else {
if (a.tv_nsec > b.tv_nsec) {
return 1;
} else if (a.tv_nsec < b.tv_nsec) {
return -1;
} else {
return 0;
}
}
}
/*
* this functions needs to return an index (0 to 7)
* to a knet_host_defrag_buf. (-1 on errors)
*/
static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
{
struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
int i, oldest;
/*
* check if there is a buffer already in use handling the same seq_num
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (src_host->defrag_buf[i].in_use) {
if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
return i;
}
}
}
/*
* If there is no buffer that's handling the current seq_num
* either it's new or it's been reclaimed already.
* check if it's been reclaimed/seen before using the defrag circular
* buffer. If the pckt has been seen before, the buffer expired (ETIME)
* and there is no point to try to defrag it again.
*/
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
errno = ETIME;
return -1;
}
/*
* register the pckt as seen
*/
_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
/*
* see if there is a free buffer
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (!src_host->defrag_buf[i].in_use) {
return i;
}
}
/*
* at this point, there are no free buffers, the pckt is new
* and we need to reclaim a buffer, and we will take the one
* with the oldest timestamp. It's as good as any.
*/
oldest = 0;
for (i = 0; i < KNET_MAX_LINK; i++) {
if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
oldest = i;
}
}
src_host->defrag_buf[oldest].in_use = 0;
return oldest;
}
static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
{
struct knet_host_defrag_buf *defrag_buf;
int defrag_buf_idx;
defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
if (defrag_buf_idx < 0) {
if (errno == ETIME) {
log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired");
}
return 1;
}
defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
/*
* if the buf is not is use, then make sure it's clean
*/
if (!defrag_buf->in_use) {
memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
defrag_buf->in_use = 1;
defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
}
/*
* update timestamp on the buffer
*/
clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
/*
* check if we already received this fragment
*/
if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
/*
* if we have received this fragment and we didn't clear the buffer
* it means that we don't have all fragments yet
*/
return 1;
}
/*
* we need to handle the last packet with gloves due to its different size
*/
if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
defrag_buf->last_frag_size = *len;
/*
* in the event when the last packet arrives first,
* we still don't know the offset vs the other fragments (based on MTU),
* so we store the fragment at the end of the buffer where it's safe
* and take a copy of the len so that we can restore its offset later.
* remember we can't use the local MTU for this calculation because pMTU
* can be asymettric between the same hosts.
*/
if (!defrag_buf->frag_size) {
defrag_buf->last_first = 1;
memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
inbuf->khp_data_userdata,
*len);
}
} else {
defrag_buf->frag_size = *len;
}
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
inbuf->khp_data_userdata, *len);
defrag_buf->frag_recv++;
defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
/*
* check if we received all the fragments
*/
if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
/*
* special case the last pckt
*/
if (defrag_buf->last_first) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
defrag_buf->last_frag_size);
}
/*
* recalculate packet lenght
*/
*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
/*
* copy the pckt back in the user data
*/
memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
/*
* free this buffer
*/
defrag_buf->in_use = 0;
return 0;
}
return 1;
}
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
{
int err = 0, savederrno = 0;
ssize_t outlen;
struct knet_host *src_host;
struct knet_link *src_link;
unsigned long long latency_last;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
struct timespec recvtime;
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[1];
int8_t channel;
struct sockaddr_storage pckt_src;
seq_num_t recv_seq_num;
int wipe_bufs = 0;
if (knet_h->crypto_instance) {
struct timespec start_time;
struct timespec end_time;
uint64_t crypt_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_authenticate_and_decrypt(knet_h,
(unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_decrypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
return;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &crypt_time);
if (crypt_time < knet_h->stats.rx_crypt_time_min) {
knet_h->stats.rx_crypt_time_min = crypt_time;
}
if (crypt_time > knet_h->stats.rx_crypt_time_max) {
knet_h->stats.rx_crypt_time_max = crypt_time;
}
knet_h->stats.rx_crypt_time_ave =
(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
crypt_time) / (knet_h->stats.rx_crypt_packets+1);
knet_h->stats.rx_crypt_packets++;
len = outlen;
inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
}
if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
return;
}
if (inbuf->kh_version != KNET_HEADER_VERSION) {
log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
return;
}
inbuf->kh_node = ntohs(inbuf->kh_node);
src_host = knet_h->host_index[inbuf->kh_node];
if (src_host == NULL) { /* host not found */
log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
return;
}
src_link = NULL;
src_link = src_host->link +
(inbuf->khp_ping_link % KNET_MAX_LINK);
if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
if (src_link->dynamic == KNET_LINK_DYNIP) {
/*
* cpyaddrport will only copy address and port of the incoming
* packet and strip extra bits such as flow and scopeid
*/
cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
&pckt_src, sockaddr_len(&pckt_src)) != 0) {
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
src_host->host_id, src_link->link_id);
memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
} else {
log_info(knet_h, KNET_SUB_RX,
"host: %u link: %u new connection established from: %s %s",
src_host->host_id, src_link->link_id,
src_link->status.dst_ipaddr, src_link->status.dst_port);
}
}
/*
* transport has already accepted the connection here
* otherwise we would not be receiving packets
*/
knet_h->transport_ops[src_link->transport_type]->transport_link_dyn_connect(knet_h, sockfd, src_link);
}
}
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_HOST_INFO:
case KNET_HEADER_TYPE_DATA:
/*
* TODO: should we accept data even if we can't reply to the other node?
* how would that work with SCTP and guaranteed delivery?
*/
if (!src_host->status.reachable) {
log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id);
//return;
}
inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
channel = inbuf->khp_data_channel;
src_host->got_data = 1;
if (src_link) {
src_link->status.stats.rx_data_packets++;
src_link->status.stats.rx_data_bytes += len;
}
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
}
return;
}
if (inbuf->khp_data_frag_num > 1) {
/*
* len as received from the socket also includes extra stuff
* that the defrag code doesn't care about. So strip it
* here and readd only for repadding once we are done
* defragging
*/
len = len - KNET_HEADER_DATA_SIZE;
if (pckt_defrag(knet_h, inbuf, &len)) {
return;
}
len = len + KNET_HEADER_DATA_SIZE;
}
if (inbuf->khp_data_compress) {
ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = decompress(knet_h, inbuf->khp_data_compress,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
knet_h->recv_from_links_buf_decompress,
&decmp_outlen);
if (!err) {
/* Collect stats */
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (compress_time < knet_h->stats.rx_compress_time_min) {
knet_h->stats.rx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.rx_compress_time_max) {
knet_h->stats.rx_compress_time_max = compress_time;
}
knet_h->stats.rx_compress_time_ave =
(knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets +
compress_time) / (knet_h->stats.rx_compressed_packets+1);
knet_h->stats.rx_compressed_packets++;
knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE;
memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
len = decmp_outlen + KNET_HEADER_DATA_SIZE;
} else {
log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
err, strerror(errno));
return;
}
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (knet_h->enabled != 1) /* data forward is disabled */
break;
if (knet_h->dst_host_filter_fn) {
size_t host_idx;
int found = 0;
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
&channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
return;
}
if ((!bcast) && (!dst_host_ids_entries)) {
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return;
}
/* check if we are dst for this packet */
if (!bcast) {
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
if (!found) {
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return;
}
}
}
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (!knet_h->sockfd[channel].in_use) {
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
memset(iov_out, 0, sizeof(iov_out));
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata;
iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE;
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
return;
}
if ((size_t)outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
}
} else { /* HOSTINFO */
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
bcast = 0;
knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
}
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
return;
}
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
switch(knet_hostinfo->khi_type) {
case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
break;
case KNET_HOSTINFO_TYPE_LINK_TABLE:
break;
default:
log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
break;
}
}
break;
case KNET_HEADER_TYPE_PING:
outlen = KNET_HEADER_PING_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PONG;
inbuf->kh_node = htons(knet_h->host_id);
recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
src_link->status.stats.rx_ping_packets++;
src_link->status.stats.rx_ping_bytes += len;
wipe_bufs = 0;
if (!inbuf->khp_ping_timed) {
/*
* we might be receiving this message from all links, but we want
* to process it only the first time
*/
if (recv_seq_num != src_host->untimed_rx_seq_num) {
/*
* cache the untimed seq num
*/
src_host->untimed_rx_seq_num = recv_seq_num;
/*
* if the host has received data in between
* untimed ping, then we don't need to wipe the bufs
*/
if (src_host->got_data) {
src_host->got_data = 0;
wipe_bufs = 0;
} else {
wipe_bufs = 1;
}
}
_seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
} else {
/*
* pings always arrives in bursts over all the link
* catch the first of them to cache the seq num and
* avoid duplicate processing
*/
if (recv_seq_num != src_host->timed_rx_seq_num) {
src_host->timed_rx_seq_num = recv_seq_num;
if (recv_seq_num == 0) {
_seq_num_lookup(src_host, recv_seq_num, 0, 1);
}
}
}
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
}
retry_pong:
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr,
sizeof(struct sockaddr_storage));
savederrno = errno;
if (len != outlen) {
err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pong_errors++;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pong_retries++;
goto retry_pong;
break;
}
}
src_link->status.stats.tx_pong_packets++;
src_link->status.stats.tx_pong_bytes += outlen;
break;
case KNET_HEADER_TYPE_PONG:
src_link->status.stats.rx_pong_packets++;
src_link->status.stats.rx_pong_bytes += len;
clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
timespec_diff(recvtime,
src_link->status.pong_last, &latency_last);
src_link->status.latency =
((src_link->status.latency * src_link->latency_exp) +
((latency_last / 1000llu) *
(src_link->latency_fix - src_link->latency_exp))) /
src_link->latency_fix;
if (src_link->status.latency < src_link->pong_timeout) {
if (!src_link->status.connected) {
if (src_link->received_pong >= src_link->pong_count) {
log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
src_host->host_id, src_link->link_id);
_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
} else {
src_link->received_pong++;
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
src_host->host_id, src_link->link_id, src_link->received_pong);
}
}
}
/* Calculate latency stats */
if (src_link->status.latency > src_link->status.stats.latency_max) {
src_link->status.stats.latency_max = src_link->status.latency;
}
if (src_link->status.latency < src_link->status.stats.latency_min) {
src_link->status.stats.latency_min = src_link->status.latency;
}
src_link->status.stats.latency_ave =
(src_link->status.stats.latency_ave * src_link->status.stats.latency_samples +
src_link->status.latency) / (src_link->status.stats.latency_samples+1);
src_link->status.stats.latency_samples++;
break;
case KNET_HEADER_TYPE_PMTUD:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
outlen = KNET_HEADER_PMTUD_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
inbuf->kh_node = htons(knet_h->host_id);
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
}
retry_pmtud:
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr,
sizeof(struct sockaddr_storage));
if (len != outlen) {
err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pmtu_errors++;
break;
case 0: /* ignore error and continue */
src_link->status.stats.tx_pmtu_errors++;
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pmtu_retries++;
goto retry_pmtud;
break;
}
}
break;
case KNET_HEADER_TYPE_PMTUD_REPLY:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
break;
}
src_link->last_recv_mtu = inbuf->khp_pmtud_size;
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
break;
default:
return;
}
}
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int err, savederrno;
int i, msg_recv, transport;
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
/*
* this is normal if a fd got an event and before we grab the read lock
* and the link is removed by another thread
*/
goto exit_unlock;
}
transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
/*
* reset msg_namelen to buffer size because after recvmmsg
* each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
*/
for (i = 0; i < PCKT_RX_BUFS; i++) {
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
}
msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
/*
* WARNING: man page for recvmmsg is wrong. Kernel implementation here:
* recvmmsg can return:
* -1 on error
* 0 if the previous run of recvmmsg recorded an error on the socket
* N number of messages (see exception below).
*
* If there is an error from recvmsg after receiving a frame or more, the recvmmsg
* loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
* it will be visibile in the next run.
*
* Need to be careful how we handle errors at this stage.
*
* error messages need to be handled on a per transport/protocol base
* at this point we have different layers of error handling
* - msg_recv < 0 -> error from this run
* msg_recv = 0 -> error from previous run and error on socket needs to be cleared
* - per-transport message data
* example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
* but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
* - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
* errno set. That means the error api needs to be able to abort the loop below.
*/
if (msg_recv <= 0) {
knet_h->transport_ops[transport]->transport_rx_sock_error(knet_h, sockfd, msg_recv, savederrno);
goto exit_unlock;
}
for (i = 0; i < msg_recv; i++) {
err = knet_h->transport_ops[transport]->transport_rx_is_data(knet_h, sockfd, &msg[i]);
/*
* TODO: make this section silent once we are confident
* all protocols packet handlers are good
*/
switch(err) {
case -1: /* on error */
log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
goto exit_unlock;
break;
case 0: /* packet is not data and we should continue the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
break;
case 1: /* packet is not data and we should STOP the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
goto exit_unlock;
break;
case 2: /* packet is data and should be parsed as such */
_parse_recv_from_links(knet_h, sockfd, &msg[i]);
break;
}
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_RX_BUFS];
struct knet_mmsghdr msg[PCKT_RX_BUFS];
struct iovec iov_in[PCKT_RX_BUFS];
memset(&msg, 0, sizeof(msg));
for (i = 0; i < PCKT_RX_BUFS; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
iov_in[i].iov_len = KNET_DATABUFSIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
}
}
return NULL;
}
diff --git a/libknet/transport_sctp.c b/libknet/transport_sctp.c
index b766fcf4..df8ed168 100644
--- a/libknet/transport_sctp.c
+++ b/libknet/transport_sctp.c
@@ -1,1440 +1,1440 @@
#include "config.h"
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdlib.h>
#include "compat.h"
#include "host.h"
-#include "link.h"
+#include "links.h"
#include "logging.h"
#include "common.h"
#include "transports.h"
#include "threads_common.h"
#ifdef HAVE_NETINET_SCTP_H
#include <netinet/sctp.h>
/*
* https://en.wikipedia.org/wiki/SCTP_packet_structure
*/
#define KNET_PMTUD_SCTP_OVERHEAD_COMMON 12
#define KNET_PMTUD_SCTP_OVERHEAD_DATA_CHUNK 16
#define KNET_PMTUD_SCTP_OVERHEAD KNET_PMTUD_SCTP_OVERHEAD_COMMON + KNET_PMTUD_SCTP_OVERHEAD_DATA_CHUNK
typedef struct sctp_handle_info {
struct knet_list_head listen_links_list;
struct knet_list_head connect_links_list;
int connect_epollfd;
int connectsockfd[2];
int listen_epollfd;
int listensockfd[2];
pthread_t connect_thread;
pthread_t listen_thread;
} sctp_handle_info_t;
/*
* use by fd_tracker data type
*/
#define SCTP_NO_LINK_INFO 0
#define SCTP_LISTENER_LINK_INFO 1
#define SCTP_ACCEPTED_LINK_INFO 2
#define SCTP_CONNECT_LINK_INFO 3
/*
* this value is per listener
*/
#define MAX_ACCEPTED_SOCKS 256
typedef struct sctp_listen_link_info {
struct knet_list_head list;
int listen_sock;
int accepted_socks[MAX_ACCEPTED_SOCKS];
struct sockaddr_storage src_address;
int on_listener_epoll;
int on_rx_epoll;
} sctp_listen_link_info_t;
typedef struct sctp_accepted_link_info {
char mread_buf[KNET_DATABUFSIZE];
ssize_t mread_len;
sctp_listen_link_info_t *link_info;
} sctp_accepted_link_info_t ;
typedef struct sctp_connect_link_info {
struct knet_list_head list;
sctp_listen_link_info_t *listener;
struct knet_link *link;
struct sockaddr_storage dst_address;
int connect_sock;
int on_connected_epoll;
int on_rx_epoll;
int close_sock;
} sctp_connect_link_info_t;
/*
* socket handling functions
*
* those functions do NOT perform locking. locking
* should be handled in the right context from callers
*/
/*
* sockets are removed from rx_epoll from callers
* see also error handling functions
*/
static int _close_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info = kn_link->transport_link;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event ev;
if (info->on_connected_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLOUT;
ev.data.fd = info->connect_sock;
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from the epoll pool: %s",
strerror(errno));
goto exit_error;
}
info->on_connected_epoll = 0;
}
exit_error:
if (info->connect_sock != -1) {
if (_set_fd_tracker(knet_h, info->connect_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
close(info->connect_sock);
info->connect_sock = -1;
}
errno = savederrno;
return err;
}
static int _enable_sctp_notifications(knet_handle_t knet_h, int sock, const char *type)
{
int err = 0, savederrno = 0;
struct sctp_event_subscribe events;
memset(&events, 0, sizeof (events));
events.sctp_data_io_event = 1;
events.sctp_association_event = 1;
events.sctp_send_failure_event = 1;
events.sctp_address_event = 1;
events.sctp_peer_error_event = 1;
events.sctp_shutdown_event = 1;
if (setsockopt(sock, IPPROTO_SCTP, SCTP_EVENTS, &events, sizeof (events)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to enable %s events: %s",
type, strerror(savederrno));
}
errno = savederrno;
return err;
}
static int _configure_sctp_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, uint64_t flags, const char *type)
{
int err = 0, savederrno = 0;
int value;
int level;
#ifdef SOL_SCTP
level = SOL_SCTP;
#else
level = IPPROTO_SCTP;
#endif
if (_configure_transport_socket(knet_h, sock, address, flags, type) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
value = 1;
if (setsockopt(sock, level, SCTP_NODELAY, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set sctp nodelay: %s",
strerror(savederrno));
goto exit_error;
}
if (_enable_sctp_notifications(knet_h, sock, type) < 0) {
savederrno = errno;
err = -1;
}
exit_error:
errno = savederrno;
return err;
}
static int _reconnect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info = kn_link->transport_link;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event ev;
if (connect(info->connect_sock, (struct sockaddr *)&kn_link->dst_addr, sockaddr_len(&kn_link->dst_addr)) < 0) {
if ((errno != EALREADY) && (errno != EINPROGRESS) && (errno != EISCONN)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to connect SCTP socket %d: %s",
info->connect_sock, strerror(savederrno));
goto exit_error;
}
}
if (!info->on_connected_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLOUT;
ev.data.fd = info->connect_sock;
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, info->connect_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add send/recv to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_connected_epoll = 1;
}
exit_error:
errno = savederrno;
return err;
}
static int _create_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info = kn_link->transport_link;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event ev;
int connect_sock;
connect_sock = socket(kn_link->dst_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP);
if (connect_sock < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create send/recv socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_configure_sctp_socket(knet_h, connect_sock, &kn_link->dst_addr, kn_link->flags, "SCTP connect") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
if (_set_fd_tracker(knet_h, connect_sock, KNET_TRANSPORT_SCTP, SCTP_CONNECT_LINK_INFO, info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
info->connect_sock = connect_sock;
info->close_sock = 0;
if (_reconnect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
exit_error:
if (err) {
if (info->on_connected_epoll) {
epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, connect_sock, &ev);
}
if (connect_sock >= 0) {
close(connect_sock);
}
}
errno = savederrno;
return err;
}
static int sctp_transport_tx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_listen_link_info_t *listen_info;
if (recv_err < 0) {
switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) {
case SCTP_CONNECT_LINK_INFO:
if (connect_info->link->transport_connected == 0) {
return -1;
}
break;
case SCTP_ACCEPTED_LINK_INFO:
listen_info = accepted_info->link_info;
if (listen_info->listen_sock != sockfd) {
if (listen_info->on_rx_epoll == 0) {
return -1;
}
}
break;
}
if (recv_errno == EAGAIN) {
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Sock: %d is overloaded. Slowing TX down", sockfd);
#endif
/* Don't hold onto the lock while sleeping */
pthread_rwlock_unlock(&knet_h->global_rwlock);
usleep(KNET_THREADS_TIMERES / 16);
pthread_rwlock_rdlock(&knet_h->global_rwlock);
return 1;
}
return -1;
}
return 0;
}
/*
* socket error management functions
*
* both called with global read lock.
*
* NOTE: we need to remove the fd from the epoll as soon as possible
* even before we notify the respective thread to take care of it
* because scheduling can make it so that this thread will overload
* and the threads supposed to take care of the error will never
* be able to take action.
* we CANNOT handle FDs here diretly (close/reconnect/etc) due
* to locking context. We need to delegate that to their respective
* management threads within global write lock.
*
* this function is called from:
* - RX thread with recv_err <= 0 directly on recvmmsg error
* - transport_rx_is_data when msg_len == 0 (recv_err = 1)
* - transport_rx_is_data on notification (recv_err = 2)
*
* basically this small abouse of recv_err is to detect notifications
* generated by sockets created by listen().
*/
static int sctp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
struct epoll_event ev;
sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_listen_link_info_t *listen_info;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) {
case SCTP_CONNECT_LINK_INFO:
/*
* all connect link have notifications enabled
* and we accept only data from notification and
* generic recvmmsg errors.
*
* Errors generated by msg_len 0 can be ignored because
* they follow a notification (double notification)
*/
if (recv_err != 1) {
connect_info->link->transport_connected = 0;
if (connect_info->on_rx_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = sockfd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
strerror(errno));
return -1;
}
connect_info->on_rx_epoll = 0;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received an error", sockfd);
if (sendto(handle_info->connectsockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno));
}
}
break;
case SCTP_ACCEPTED_LINK_INFO:
listen_info = accepted_info->link_info;
if (listen_info->listen_sock != sockfd) {
if (recv_err != 1) {
if (listen_info->on_rx_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = sockfd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
strerror(errno));
return -1;
}
listen_info->on_rx_epoll = 0;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying listen thread that sockfd %d received an error", sockfd);
if (sendto(handle_info->listensockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify listen thread: %s", strerror(errno));
}
}
} else {
/*
* this means the listen() socket has generated
* a notification. now what? :-)
*/
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen() socket %d", sockfd);
}
break;
default:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received unknown notification? %d", sockfd);
break;
}
/*
* Under RX pressure we need to give time to IPC to pick up the message
*/
/* Don't hold onto the lock while sleeping */
pthread_rwlock_unlock(&knet_h->global_rwlock);
usleep(KNET_THREADS_TIMERES / 2);
pthread_rwlock_rdlock(&knet_h->global_rwlock);
return 0;
}
/*
* NOTE: sctp_transport_rx_is_data is called with global rdlock
* delegate any FD error management to sctp_transport_rx_sock_error
* and keep this code to parsing incoming data only
*/
static int sctp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
size_t i;
struct iovec *iov = msg->msg_hdr.msg_iov;
size_t iovlen = msg->msg_hdr.msg_iovlen;
struct sctp_assoc_change *sac;
union sctp_notification *snp;
sctp_accepted_link_info_t *info = knet_h->knet_transport_fd_tracker[sockfd].data;
if (!(msg->msg_hdr.msg_flags & MSG_NOTIFICATION)) {
if (msg->msg_len == 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "received 0 bytes len packet: %d", sockfd);
/*
* NOTE: with event notification enabled, we receive error twice:
* 1) from the event notification
* 2) followed by a 0 byte msg_len
*
* This is generally not a problem if not for causing extra
* handling for the same issue. Should we drop notifications
* and keep the code generic (handle all errors via msg_len = 0)
* or keep the duplication as safety measure, or drop msg_len = 0
* handling (what about sockets without events enabled?)
*/
sctp_transport_rx_sock_error(knet_h, sockfd, 1, 0);
return 1;
}
/*
* missing MSG_EOR has to be treated as a short read
* from the socket and we need to fill in the mread buf
* while we wait for MSG_EOR
*/
if (!(msg->msg_hdr.msg_flags & MSG_EOR)) {
/*
* copy the incoming data into mread_buf + mread_len (incremental)
* and increase mread_len
*/
memmove(info->mread_buf + info->mread_len, iov->iov_base, msg->msg_len);
info->mread_len = info->mread_len + msg->msg_len;
return 0;
}
/*
* got EOR.
* if mread_len is > 0 we are completing a packet from short reads
* complete reassembling the packet in mread_buf, copy it back in the iov
* and set the iov/msg len numbers (size) correctly
*/
if (info->mread_len) {
/*
* add last fragment to mread_buf
*/
memmove(info->mread_buf + info->mread_len, iov->iov_base, msg->msg_len);
info->mread_len = info->mread_len + msg->msg_len;
/*
* move all back into the iovec
*/
memmove(iov->iov_base, info->mread_buf, info->mread_len);
msg->msg_len = info->mread_len;
info->mread_len = 0;
}
return 2;
}
if (!(msg->msg_hdr.msg_flags & MSG_EOR)) {
return 1;
}
for (i=0; i< iovlen; i++) {
snp = iov[i].iov_base;
switch (snp->sn_header.sn_type) {
case SCTP_ASSOC_CHANGE:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change");
sac = &snp->sn_assoc_change;
if (sac->sac_state == SCTP_COMM_LOST) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change: comm_lost");
sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
}
break;
case SCTP_SHUTDOWN_EVENT:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp shutdown event");
sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
break;
case SCTP_SEND_FAILED:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp send failed");
break;
case SCTP_PEER_ADDR_CHANGE:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp peer addr change");
break;
case SCTP_REMOTE_ERROR:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp remote error");
break;
default:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] unknown sctp event type: %hu\n", snp->sn_header.sn_type);
break;
}
}
return 0;
}
/*
* connect / outgoing socket management thread
*/
/*
* _handle_connected_sctp* are called with a global write lock
* from the connect_thread
*/
static void _handle_connected_sctp(knet_handle_t knet_h, int connect_sock)
{
int err;
struct epoll_event ev;
unsigned int status, len = sizeof(status);
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_connect_link_info_t *info = knet_h->knet_transport_fd_tracker[connect_sock].data;
struct knet_link *kn_link = info->link;
err = getsockopt(connect_sock, SOL_SOCKET, SO_ERROR, &status, &len);
if (err) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP getsockopt() on connecting socket %d failed: %s",
connect_sock, strerror(errno));
return;
}
if (info->close_sock) {
if (_close_connect_socket(knet_h, kn_link) < 0) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close sock %d from _handle_connected_sctp: %s", connect_sock, strerror(errno));
return;
}
info->close_sock = 0;
if (_create_connect_socket(knet_h, kn_link) < 0) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to recreate connecting sock! %s", strerror(errno));
return;
}
}
if (status) {
log_info(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect on %d to %s port %s failed: %s",
connect_sock, kn_link->status.dst_ipaddr, kn_link->status.dst_port,
strerror(status));
/*
* No need to create a new socket if connect failed,
* just retry connect
*/
_reconnect_socket(knet_h, info->link);
return;
}
/*
* Connected - Remove us from the connect epoll
*/
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLOUT;
ev.data.fd = connect_sock;
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, connect_sock, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket %d from epoll pool: %s",
connect_sock, strerror(errno));
}
info->on_connected_epoll = 0;
kn_link->transport_connected = 1;
kn_link->outsock = info->connect_sock;
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = connect_sock;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, connect_sock, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connected socket to epoll pool: %s",
strerror(errno));
}
info->on_rx_epoll = 1;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP handler fd %d now connected to %s port %s",
connect_sock,
kn_link->status.dst_ipaddr, kn_link->status.dst_port);
}
static void _handle_connected_sctp_errors(knet_handle_t knet_h)
{
int sockfd = -1;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_connect_link_info_t *info;
if (recv(handle_info->connectsockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on connectsockfd");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for connected socket fd error");
return;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing connected error on socket: %d", sockfd);
info = knet_h->knet_transport_fd_tracker[sockfd].data;
info->close_sock = 1;
info->link->transport_connected = 0;
_reconnect_socket(knet_h, info->link);
}
static void *_sctp_connect_thread(void *data)
{
int savederrno;
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(handle_info->connect_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
if (nev < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect handler EPOLL ERROR: %s",
strerror(errno));
continue;
}
/*
* Sort out which FD has a connection
*/
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s",
strerror(savederrno));
continue;
}
/*
* minor optimization: deduplicate events
*
* in some cases we can receive multiple notifcations
* of the same FD having issues or need handling.
* It's enough to process it once even tho it's safe
* to handle them multiple times.
*/
for (i = 0; i < nev; i++) {
if (events[i].data.fd == handle_info->connectsockfd[0]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for connected socket");
_handle_connected_sctp_errors(knet_h);
} else {
if (_is_valid_fd(knet_h, events[i].data.fd) == 1) {
_handle_connected_sctp(knet_h, events[i].data.fd);
} else {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for dead fd %d\n", events[i].data.fd);
}
}
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
/*
* this thread can generate events for itself.
* we need to sleep in between loops to allow other threads
* to be scheduled
*/
usleep(knet_h->reconnect_int * 1000);
}
return NULL;
}
/*
* listen/incoming connections management thread
*/
/*
* Listener received a new connection
* called with a write lock from main thread
*/
static void _handle_incoming_sctp(knet_handle_t knet_h, int listen_sock)
{
int err = 0, savederrno = 0;
int new_fd;
int i = -1;
sctp_listen_link_info_t *info = knet_h->knet_transport_fd_tracker[listen_sock].data;
struct epoll_event ev;
struct sockaddr_storage ss;
socklen_t sock_len = sizeof(ss);
char addr_str[KNET_MAX_HOST_LEN];
char port_str[KNET_MAX_PORT_LEN];
sctp_accepted_link_info_t *accept_info = NULL;
new_fd = accept(listen_sock, (struct sockaddr *)&ss, &sock_len);
if (new_fd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accept error: %s", strerror(errno));
goto exit_error;
}
if (knet_addrtostr(&ss, sizeof(ss),
addr_str, KNET_MAX_HOST_LEN,
port_str, KNET_MAX_PORT_LEN) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to gather socket info");
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: received connection from: %s port: %s",
addr_str, port_str);
/*
* Keep a track of all accepted FDs
*/
for (i=0; i<MAX_ACCEPTED_SOCKS; i++) {
if (info->accepted_socks[i] == -1) {
info->accepted_socks[i] = new_fd;
break;
}
}
if (i == MAX_ACCEPTED_SOCKS) {
errno = EBUSY;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: too many connections!");
goto exit_error;
}
if (_configure_common_socket(knet_h, new_fd, 0, "SCTP incoming") < 0) { /* Inherit flags from listener? */
savederrno = errno;
err = -1;
goto exit_error;
}
if (_enable_sctp_notifications(knet_h, new_fd, "Incoming connection") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
accept_info = malloc(sizeof(sctp_accepted_link_info_t));
if (!accept_info) {
savederrno = errno;
err = -1;
goto exit_error;
}
memset(accept_info, 0, sizeof(sctp_accepted_link_info_t));
accept_info->link_info = info;
if (_set_fd_tracker(knet_h, new_fd, KNET_TRANSPORT_SCTP, SCTP_ACCEPTED_LINK_INFO, accept_info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(errno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = new_fd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, new_fd, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to add accepted socket %d to epoll pool: %s",
new_fd, strerror(errno));
goto exit_error;
}
info->on_rx_epoll = 1;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accepted new fd %d for %s/%s (listen fd: %d). index: %d",
new_fd, addr_str, port_str, info->listen_sock, i);
exit_error:
if (err) {
if ((i >= 0) || (i < MAX_ACCEPTED_SOCKS)) {
info->accepted_socks[i] = -1;
}
_set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL);
free(accept_info);
close(new_fd);
}
errno = savederrno;
return;
}
/*
* Listen thread received a notification of a bad socket that needs closing
* called with a write lock from main thread
*/
static void _handle_listen_sctp_errors(knet_handle_t knet_h)
{
int sockfd = -1;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_accepted_link_info_t *accept_info;
sctp_listen_link_info_t *info;
struct knet_host *host;
int link_idx;
int i;
if (recv(handle_info->listensockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on listensockfd");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen socket fd error");
return;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing listen error on socket: %d", sockfd);
accept_info = knet_h->knet_transport_fd_tracker[sockfd].data;
info = accept_info->link_info;
/*
* clear all links using this accepted socket as
* outbound dynamically connected socket
*/
for (host = knet_h->host_head; host != NULL; host = host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if ((host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
(host->link[link_idx].outsock == sockfd)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Found dynamic connection on host %d link %d (%d)",
host->host_id, link_idx, sockfd);
host->link[link_idx].status.dynconnected = 0;
host->link[link_idx].transport_connected = 0;
host->link[link_idx].outsock = 0;
memset(&host->link[link_idx].dst_addr, 0, sizeof(struct sockaddr_storage));
}
}
}
for (i=0; i<MAX_ACCEPTED_SOCKS; i++) {
if (sockfd == info->accepted_socks[i]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Closing accepted socket %d", sockfd);
_set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL);
info->accepted_socks[i] = -1;
free(accept_info);
close(sockfd);
}
}
}
static void *_sctp_listen_thread(void *data)
{
int savederrno;
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(handle_info->listen_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
if (nev < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listen handler EPOLL ERROR: %s",
strerror(errno));
continue;
}
savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s",
strerror(savederrno));
continue;
}
/*
* Sort out which FD has an incoming connection
*/
for (i = 0; i < nev; i++) {
if (events[i].data.fd == handle_info->listensockfd[0]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for listener/accepted socket");
_handle_listen_sctp_errors(knet_h);
} else {
if (_is_valid_fd(knet_h, events[i].data.fd) == 1) {
_handle_incoming_sctp(knet_h, events[i].data.fd);
} else {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received listen notification from invalid socket");
}
}
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
return NULL;
}
/*
* sctp_link_listener_start/stop are called in global write lock
* context from set_config and clear_config.
*/
static sctp_listen_link_info_t *sctp_link_listener_start(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int listen_sock = -1;
struct epoll_event ev;
sctp_listen_link_info_t *info = NULL;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
/*
* Only allocate a new listener if src address is different
*/
knet_list_for_each_entry(info, &handle_info->listen_links_list, list) {
if (memcmp(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)) == 0) {
return info;
}
}
info = malloc(sizeof(sctp_listen_link_info_t));
if (!info) {
err = -1;
goto exit_error;
}
memset(info, 0, sizeof(sctp_listen_link_info_t));
memset(info->accepted_socks, -1, sizeof(info->accepted_socks));
memcpy(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage));
listen_sock = socket(kn_link->src_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP);
if (listen_sock < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_configure_sctp_socket(knet_h, listen_sock, &kn_link->src_addr, kn_link->flags, "SCTP listener") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
if (bind(listen_sock, (struct sockaddr *)&kn_link->src_addr, sockaddr_len(&kn_link->src_addr)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (listen(listen_sock, 5) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to listen on listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_set_fd_tracker(knet_h, listen_sock, KNET_TRANSPORT_SCTP, SCTP_LISTENER_LINK_INFO, info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, listen_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listener to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_listener_epoll = 1;
info->listen_sock = listen_sock;
knet_list_add(&info->list, &handle_info->listen_links_list);
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Listening on fd %d for %s:%s", listen_sock, kn_link->status.src_ipaddr, kn_link->status.src_port);
exit_error:
if (err) {
if (info->on_listener_epoll) {
epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, listen_sock, &ev);
}
if (listen_sock >= 0) {
close(listen_sock);
}
if (info) {
free(info);
info = NULL;
}
}
errno = savederrno;
return info;
}
static int sctp_link_listener_stop(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int found = 0, i;
struct knet_host *host;
int link_idx;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_connect_link_info_t *this_link_info = kn_link->transport_link;
sctp_listen_link_info_t *info = this_link_info->listener;
sctp_connect_link_info_t *link_info;
struct epoll_event ev;
for (host = knet_h->host_head; host != NULL; host = host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (&host->link[link_idx] == kn_link)
continue;
link_info = host->link[link_idx].transport_link;
if ((link_info) &&
(link_info->listener == info) &&
(host->link[link_idx].status.enabled == 1)) {
found = 1;
break;
}
}
}
if (found) {
this_link_info->listener = NULL;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listener socket %d still in use", info->listen_sock);
savederrno = EBUSY;
err = -1;
goto exit_error;
}
if (info->on_listener_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->listen_sock;
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, info->listen_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_listener_epoll = 0;
}
if (_set_fd_tracker(knet_h, info->listen_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
close(info->listen_sock);
for (i=0; i< MAX_ACCEPTED_SOCKS; i++) {
if (info->accepted_socks[i] > -1) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->accepted_socks[i];
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->accepted_socks[i], &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
strerror(errno));
}
info->on_rx_epoll = 0;
free(knet_h->knet_transport_fd_tracker[info->accepted_socks[i]].data);
close(info->accepted_socks[i]);
if (_set_fd_tracker(knet_h, info->accepted_socks[i], KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
info->accepted_socks[i] = -1;
}
}
knet_list_del(&info->list);
free(info);
this_link_info->listener = NULL;
exit_error:
errno = savederrno;
return err;
}
/*
* Links config/clear. Both called with global wrlock from link_set_config/clear_config
*/
static int sctp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int savederrno = 0, err = 0;
sctp_connect_link_info_t *info;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
info = malloc(sizeof(sctp_connect_link_info_t));
if (!info) {
goto exit_error;
}
memset(info, 0, sizeof(sctp_connect_link_info_t));
kn_link->transport_link = info;
info->link = kn_link;
memcpy(&info->dst_address, &kn_link->dst_addr, sizeof(struct sockaddr_storage));
info->on_connected_epoll = 0;
info->connect_sock = -1;
info->listener = sctp_link_listener_start(knet_h, kn_link);
if (!info->listener) {
savederrno = errno;
err = -1;
goto exit_error;
}
if (kn_link->dynamic == KNET_LINK_STATIC) {
if (_create_connect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
kn_link->outsock = info->connect_sock;
}
knet_list_add(&info->list, &handle_info->connect_links_list);
exit_error:
if (err) {
if (info) {
if (info->connect_sock) {
close(info->connect_sock);
}
if (info->listener) {
sctp_link_listener_stop(knet_h, kn_link);
}
kn_link->transport_link = NULL;
free(info);
}
}
errno = savederrno;
return err;
}
/*
* called with global wrlock
*/
static int sctp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info;
struct epoll_event ev;
if (!kn_link) {
errno = EINVAL;
return -1;
}
info = kn_link->transport_link;
if (!info) {
errno = EINVAL;
return -1;
}
if ((sctp_link_listener_stop(knet_h, kn_link) <0) && (errno != EBUSY)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener trasport: %s",
strerror(savederrno));
goto exit_error;
}
if (info->on_rx_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->connect_sock;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_rx_epoll = 0;
}
if (_close_connect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close connected socket: %s",
strerror(savederrno));
goto exit_error;
}
knet_list_del(&info->list);
free(info);
kn_link->transport_link = NULL;
exit_error:
errno = savederrno;
return err;
}
/*
* transport_free and transport_init are
* called only from knet_handle_new and knet_handle_free.
* all resources (hosts/links) should have been already freed at this point
* and they are called in a write locked context, hence they
* don't need their own locking.
*/
static int sctp_transport_free(knet_handle_t knet_h)
{
sctp_handle_info_t *handle_info;
void *thread_status;
struct epoll_event ev;
if (!knet_h->transports[KNET_TRANSPORT_SCTP]) {
errno = EINVAL;
return -1;
}
handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
/*
* keep it here while we debug list usage and such
*/
if (!knet_list_empty(&handle_info->listen_links_list)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. listen links list is not empty");
}
if (!knet_list_empty(&handle_info->connect_links_list)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. connect links list is not empty");
}
if (handle_info->listen_thread) {
pthread_cancel(handle_info->listen_thread);
pthread_join(handle_info->listen_thread, &thread_status);
}
if (handle_info->connect_thread) {
pthread_cancel(handle_info->connect_thread);
pthread_join(handle_info->connect_thread, &thread_status);
}
if (handle_info->listensockfd[0] >= 0) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->listensockfd[0];
epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, handle_info->listensockfd[0], &ev);
}
if (handle_info->connectsockfd[0] >= 0) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->connectsockfd[0];
epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, handle_info->connectsockfd[0], &ev);
}
_close_socketpair(knet_h, handle_info->connectsockfd);
_close_socketpair(knet_h, handle_info->listensockfd);
if (handle_info->listen_epollfd >= 0) {
close(handle_info->listen_epollfd);
}
if (handle_info->connect_epollfd >= 0) {
close(handle_info->connect_epollfd);
}
free(handle_info);
knet_h->transports[KNET_TRANSPORT_SCTP] = NULL;
return 0;
}
static int sctp_transport_init(knet_handle_t knet_h)
{
int err = 0, savederrno = 0;
sctp_handle_info_t *handle_info;
struct epoll_event ev;
if (knet_h->transports[KNET_TRANSPORT_SCTP]) {
errno = EEXIST;
return -1;
}
handle_info = malloc(sizeof(sctp_handle_info_t));
if (!handle_info) {
return -1;
}
memset(handle_info, 0,sizeof(sctp_handle_info_t));
knet_h->transports[KNET_TRANSPORT_SCTP] = handle_info;
knet_list_init(&handle_info->listen_links_list);
knet_list_init(&handle_info->connect_links_list);
handle_info->listen_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (handle_info->listen_epollfd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll listen fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(handle_info->listen_epollfd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on listen_epollfd: %s",
strerror(savederrno));
goto exit_fail;
}
handle_info->connect_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (handle_info->connect_epollfd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll connect fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(handle_info->connect_epollfd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on connect_epollfd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, handle_info->connectsockfd) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init connect socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->connectsockfd[0];
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, handle_info->connectsockfd[0], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connectsockfd[0] to connect epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, handle_info->listensockfd) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init listen socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->listensockfd[0];
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, handle_info->listensockfd[0], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listensockfd[0] to listen epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
/*
* Start connect & listener threads
*/
savederrno = pthread_create(&handle_info->listen_thread, 0, _sctp_listen_thread, (void *) knet_h);
if (savederrno) {
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp listen thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&handle_info->connect_thread, 0, _sctp_connect_thread, (void *) knet_h);
if (savederrno) {
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp connect thread: %s",
strerror(savederrno));
goto exit_fail;
}
exit_fail:
if (err < 0) {
sctp_transport_free(knet_h);
}
errno = savederrno;
return err;
}
static int sctp_transport_link_dyn_connect(knet_handle_t knet_h, int sockfd, struct knet_link *kn_link)
{
kn_link->outsock = sockfd;
kn_link->status.dynconnected = 1;
kn_link->transport_connected = 1;
return 0;
}
static knet_transport_ops_t sctp_transport_ops = {
.transport_name = "SCTP",
.transport_id = KNET_TRANSPORT_SCTP,
.transport_mtu_overhead = KNET_PMTUD_SCTP_OVERHEAD,
.transport_init = sctp_transport_init,
.transport_free = sctp_transport_free,
.transport_link_set_config = sctp_transport_link_set_config,
.transport_link_clear_config = sctp_transport_link_clear_config,
.transport_link_dyn_connect = sctp_transport_link_dyn_connect,
.transport_rx_sock_error = sctp_transport_rx_sock_error,
.transport_tx_sock_error = sctp_transport_tx_sock_error,
.transport_rx_is_data = sctp_transport_rx_is_data,
};
knet_transport_ops_t *get_sctp_transport()
{
return &sctp_transport_ops;
}
#else // HAVE_NETINET_SCTP_H
knet_transport_ops_t *get_sctp_transport()
{
return NULL;
}
#endif
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, Feb 26, 3:13 AM (1 d, 2 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1465125
Default Alt Text
(123 KB)
Attached To
Mode
rK kronosnet
Attached
Detach File
Event Timeline
Log In to Comment