Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/libknet/handle.c b/libknet/handle.c
index 15133dfb..37b28a7a 100644
--- a/libknet/handle.c
+++ b/libknet/handle.c
@@ -1,1590 +1,1584 @@
/*
* Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <sys/uio.h>
#include <math.h>
#include <sys/time.h>
#include <sys/resource.h>
#include "internals.h"
#include "crypto.h"
#include "links.h"
#include "compress.h"
#include "compat.h"
#include "common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_pmtud.h"
#include "threads_dsthandler.h"
#include "threads_rx.h"
#include "threads_tx.h"
#include "transports.h"
#include "transport_common.h"
#include "logging.h"
static pthread_mutex_t handle_config_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_rwlock_t shlib_rwlock;
static uint8_t shlib_wrlock_init = 0;
static uint32_t knet_ref = 0;
static int _init_shlib_tracker(knet_handle_t knet_h)
{
int savederrno = 0;
if (!shlib_wrlock_init) {
savederrno = pthread_rwlock_init(&shlib_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize shared lib rwlock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
shlib_wrlock_init = 1;
}
return 0;
}
static void _fini_shlib_tracker(void)
{
if (knet_ref == 0) {
pthread_rwlock_destroy(&shlib_rwlock);
shlib_wrlock_init = 0;
}
return;
}
static int _init_locks(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_rwlock_init(&knet_h->global_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize list rwlock: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->lock_init_done = 1;
savederrno = pthread_mutex_init(&knet_h->pmtud_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_cond_init(&knet_h->pmtud_cond, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud conditional mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->hb_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize hb_thread mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->tx_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->backoff_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pong timeout backoff mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->tx_seq_num_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_seq_num_mutex mutex: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_locks(knet_handle_t knet_h)
{
knet_h->lock_init_done = 0;
pthread_rwlock_destroy(&knet_h->global_rwlock);
pthread_mutex_destroy(&knet_h->pmtud_mutex);
pthread_cond_destroy(&knet_h->pmtud_cond);
pthread_mutex_destroy(&knet_h->hb_mutex);
pthread_mutex_destroy(&knet_h->tx_mutex);
pthread_mutex_destroy(&knet_h->backoff_mutex);
pthread_mutex_destroy(&knet_h->tx_seq_num_mutex);
}
static int _init_socks(knet_handle_t knet_h)
{
int savederrno = 0;
if (_init_socketpair(knet_h, knet_h->hostsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, knet_h->dstsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_socks(knet_handle_t knet_h)
{
_close_socketpair(knet_h, knet_h->dstsockfd);
_close_socketpair(knet_h, knet_h->hostsockfd);
}
static int _init_buffers(knet_handle_t knet_h)
{
int savederrno = 0;
int i;
size_t bufsize;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE;
knet_h->send_to_links_buf[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf[i], 0, bufsize);
}
for (i = 0; i < PCKT_RX_BUFS; i++) {
knet_h->recv_from_links_buf[i] = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf[i], 0, KNET_DATABUFSIZE);
}
knet_h->recv_from_sock_buf = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_sock_buf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for app to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_sock_buf, 0, KNET_DATABUFSIZE);
knet_h->pingbuf = malloc(KNET_HEADER_PING_SIZE);
if (!knet_h->pingbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf, 0, KNET_HEADER_PING_SIZE);
knet_h->pmtudbuf = malloc(KNET_PMTUD_SIZE_V6);
if (!knet_h->pmtudbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf, 0, KNET_PMTUD_SIZE_V6);
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE + KNET_DATABUFSIZE_CRYPT_PAD;
knet_h->send_to_links_buf_crypt[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf_crypt[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf_crypt[i], 0, bufsize);
}
knet_h->recv_from_links_buf_decrypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_decrypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_decrypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->recv_from_links_buf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pingbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pingbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pmtudbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pmtudbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->recv_from_links_buf_decompress = malloc(KNET_DATABUFSIZE_COMPRESS);
if (!knet_h->recv_from_links_buf_decompress) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for decompress buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_decompress, 0, KNET_DATABUFSIZE_COMPRESS);
knet_h->send_to_links_buf_compress = malloc(KNET_DATABUFSIZE_COMPRESS);
if (!knet_h->send_to_links_buf_compress) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for compress buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf_compress, 0, KNET_DATABUFSIZE_COMPRESS);
memset(knet_h->knet_transport_fd_tracker, KNET_MAX_TRANSPORTS, sizeof(knet_h->knet_transport_fd_tracker));
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_buffers(knet_handle_t knet_h)
{
int i;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
free(knet_h->send_to_links_buf[i]);
free(knet_h->send_to_links_buf_crypt[i]);
}
for (i = 0; i < PCKT_RX_BUFS; i++) {
free(knet_h->recv_from_links_buf[i]);
}
free(knet_h->recv_from_links_buf_decompress);
free(knet_h->send_to_links_buf_compress);
free(knet_h->recv_from_sock_buf);
free(knet_h->recv_from_links_buf_decrypt);
free(knet_h->recv_from_links_buf_crypt);
free(knet_h->pingbuf);
free(knet_h->pingbuf_crypt);
free(knet_h->pmtudbuf);
free(knet_h->pmtudbuf_crypt);
}
static int _init_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int savederrno = 0;
/*
* even if the kernel does dynamic allocation with epoll_ctl
* we need to reserve one extra for host to host communication
*/
knet_h->send_to_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (knet_h->send_to_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll datafd to link fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->recv_from_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->recv_from_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll link to datafd fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->dst_link_handler_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->dst_link_handler_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll dst cache fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->send_to_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd to link epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->recv_from_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on link to datafd epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->dst_link_handler_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on dst cache epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->hostsockfd[0];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->dstsockfd[0];
if (epoll_ctl(knet_h->dst_link_handler_epollfd,
EPOLL_CTL_ADD, knet_h->dstsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add dstsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int i;
memset(&ev, 0, sizeof(struct epoll_event));
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if (knet_h->sockfd[i].in_use) {
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created], &ev);
if (knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created]) {
_close_socketpair(knet_h, knet_h->sockfd[i].sockfd);
}
}
}
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev);
epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev);
close(knet_h->send_to_links_epollfd);
close(knet_h->recv_from_links_epollfd);
close(knet_h->dst_link_handler_epollfd);
}
static int _start_threads(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, 0,
_handle_pmtud_link_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start pmtud link thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->dst_link_handler_thread, 0,
_handle_dst_link_handler_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start dst cache thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->send_to_links_thread, 0,
_handle_send_to_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start datafd to link thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->recv_from_links_thread, 0,
_handle_recv_from_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start link to datafd thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&knet_h->heartbt_thread, 0,
_handle_heartbt_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start heartbeat thread: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _stop_threads(knet_handle_t knet_h)
{
void *retval;
/*
* allow threads to catch on shutdown request
* and release locks before we stop them.
* this isn't the most efficent way to handle it
* but it works good enough for now
*/
sleep(1);
if (knet_h->heartbt_thread) {
pthread_cancel(knet_h->heartbt_thread);
pthread_join(knet_h->heartbt_thread, &retval);
}
if (knet_h->send_to_links_thread) {
pthread_cancel(knet_h->send_to_links_thread);
pthread_join(knet_h->send_to_links_thread, &retval);
}
if (knet_h->recv_from_links_thread) {
pthread_cancel(knet_h->recv_from_links_thread);
pthread_join(knet_h->recv_from_links_thread, &retval);
}
if (knet_h->dst_link_handler_thread) {
pthread_cancel(knet_h->dst_link_handler_thread);
pthread_join(knet_h->dst_link_handler_thread, &retval);
}
- pthread_mutex_lock(&knet_h->pmtud_mutex);
- pthread_cond_signal(&knet_h->pmtud_cond);
- pthread_mutex_unlock(&knet_h->pmtud_mutex);
-
- sleep(1);
-
if (knet_h->pmtud_link_handler_thread) {
pthread_cancel(knet_h->pmtud_link_handler_thread);
pthread_join(knet_h->pmtud_link_handler_thread, &retval);
}
}
knet_handle_t knet_handle_new(knet_node_id_t host_id,
int log_fd,
uint8_t default_log_level)
{
knet_handle_t knet_h;
int savederrno = 0;
struct rlimit cur;
if (getrlimit(RLIMIT_NOFILE, &cur) < 0) {
return NULL;
}
if ((log_fd < 0) || ((unsigned int)log_fd >= cur.rlim_max)) {
errno = EINVAL;
return NULL;
}
/*
* validate incoming request
*/
if ((log_fd) && (default_log_level > KNET_LOG_DEBUG)) {
errno = EINVAL;
return NULL;
}
/*
* allocate handle
*/
knet_h = malloc(sizeof(struct knet_handle));
if (!knet_h) {
errno = ENOMEM;
return NULL;
}
memset(knet_h, 0, sizeof(struct knet_handle));
savederrno = pthread_mutex_lock(&handle_config_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s",
strerror(savederrno));
errno = savederrno;
goto exit_fail;
}
/*
* copy config in place
*/
knet_h->host_id = host_id;
knet_h->logfd = log_fd;
if (knet_h->logfd > 0) {
memset(&knet_h->log_levels, default_log_level, KNET_MAX_SUBSYSTEMS);
}
/*
* set pmtud default timers
*/
knet_h->pmtud_interval = KNET_PMTUD_DEFAULT_INTERVAL;
/*
* set transports reconnect default timers
*/
knet_h->reconnect_int = KNET_TRANSPORT_DEFAULT_RECONNECT_INTERVAL;
/*
* Set 'min' stats to the maximum value so the
* first value we get is always less
*/
knet_h->stats.tx_compress_time_min = UINT64_MAX;
knet_h->stats.rx_compress_time_min = UINT64_MAX;
knet_h->stats.tx_crypt_time_min = UINT64_MAX;
knet_h->stats.rx_crypt_time_min = UINT64_MAX;
/*
* init global shlib tracker
*/
if (_init_shlib_tracker(knet_h) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to init handles traceker: %s",
strerror(savederrno));
errno = savederrno;
goto exit_fail;
}
/*
* init main locking structures
*/
if (_init_locks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* init sockets
*/
if (_init_socks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* allocate packet buffers
*/
if (_init_buffers(knet_h)) {
savederrno = errno;
goto exit_fail;
}
if (compress_init(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* create epoll fds
*/
if (_init_epolls(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* start transports
*/
if (start_all_transports(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* start internal threads
*/
if (_start_threads(knet_h)) {
savederrno = errno;
goto exit_fail;
}
knet_ref++;
pthread_mutex_unlock(&handle_config_mutex);
return knet_h;
exit_fail:
pthread_mutex_unlock(&handle_config_mutex);
knet_handle_free(knet_h);
errno = savederrno;
return NULL;
}
int knet_handle_free(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_mutex_lock(&handle_config_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h) {
pthread_mutex_unlock(&handle_config_mutex);
errno = EINVAL;
return -1;
}
if (!knet_h->lock_init_done) {
goto exit_nolock;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
pthread_mutex_unlock(&handle_config_mutex);
errno = savederrno;
return -1;
}
if (knet_h->host_head != NULL) {
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_HANDLE,
"Unable to free handle: host(s) or listener(s) are still active: %s",
strerror(savederrno));
pthread_rwlock_unlock(&knet_h->global_rwlock);
pthread_mutex_unlock(&handle_config_mutex);
errno = savederrno;
return -1;
}
knet_h->fini_in_progress = 1;
pthread_rwlock_unlock(&knet_h->global_rwlock);
_stop_threads(knet_h);
stop_all_transports(knet_h);
_close_epolls(knet_h);
_destroy_buffers(knet_h);
_close_socks(knet_h);
crypto_fini(knet_h);
compress_fini(knet_h, 1);
_destroy_locks(knet_h);
exit_nolock:
free(knet_h);
knet_h = NULL;
knet_ref--;
_fini_shlib_tracker();
pthread_mutex_unlock(&handle_config_mutex);
return 0;
}
int knet_handle_enable_sock_notify(knet_handle_t knet_h,
void *sock_notify_fn_private_data,
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno))
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!sock_notify_fn) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data;
knet_h->sock_notify_fn = sock_notify_fn;
log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled");
pthread_rwlock_unlock(&knet_h->global_rwlock);
return err;
}
int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel)
{
int err = 0, savederrno = 0;
int i;
struct epoll_event ev;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd == NULL) {
errno = EINVAL;
return -1;
}
if (channel == NULL) {
errno = EINVAL;
return -1;
}
if (*channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sock_notify_fn) {
log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (*datafd > 0) {
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) {
log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i);
savederrno = EEXIST;
err = -1;
goto out_unlock;
}
}
}
/*
* auto allocate a channel
*/
if (*channel < 0) {
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if (!knet_h->sockfd[i].in_use) {
*channel = i;
break;
}
}
if (*channel < 0) {
savederrno = EBUSY;
err = -1;
goto out_unlock;
}
} else {
if (knet_h->sockfd[*channel].in_use) {
savederrno = EBUSY;
err = -1;
goto out_unlock;
}
}
knet_h->sockfd[*channel].is_created = 0;
knet_h->sockfd[*channel].is_socket = 0;
knet_h->sockfd[*channel].has_error = 0;
if (*datafd > 0) {
int sockopt;
socklen_t sockoptlen = sizeof(sockopt);
if (_fdset_cloexec(*datafd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s",
strerror(savederrno));
goto out_unlock;
}
if (_fdset_nonblock(*datafd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s",
strerror(savederrno));
goto out_unlock;
}
knet_h->sockfd[*channel].sockfd[0] = *datafd;
knet_h->sockfd[*channel].sockfd[1] = 0;
if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) {
knet_h->sockfd[*channel].is_socket = 1;
}
} else {
if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) {
savederrno = errno;
err = -1;
goto out_unlock;
}
knet_h->sockfd[*channel].is_created = 1;
knet_h->sockfd[*channel].is_socket = 1;
*datafd = knet_h->sockfd[*channel].sockfd[0];
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s",
knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno));
if (knet_h->sockfd[*channel].is_created) {
_close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd);
}
goto out_unlock;
}
knet_h->sockfd[*channel].in_use = 1;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd)
{
int err = 0, savederrno = 0;
int8_t channel = -1;
int i;
struct epoll_event ev;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd <= 0) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) &&
(knet_h->sockfd[i].sockfd[0] == datafd)) {
channel = i;
break;
}
}
if (channel < 0) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (!knet_h->sockfd[channel].has_error) {
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
goto out_unlock;
}
}
if (knet_h->sockfd[channel].is_created) {
_close_socketpair(knet_h, knet_h->sockfd[channel].sockfd);
}
memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock));
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd)
{
int err = 0, savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) {
errno = EINVAL;
return -1;
}
if (datafd == NULL) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
*datafd = knet_h->sockfd[channel].sockfd[0];
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel)
{
int err = 0, savederrno = 0;
int i;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd <= 0) {
errno = EINVAL;
return -1;
}
if (channel == NULL) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*channel = -1;
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) &&
(knet_h->sockfd[i].sockfd[0] == datafd)) {
*channel = i;
break;
}
}
if (*channel < 0) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_enable_filter(knet_handle_t knet_h,
void *dst_host_filter_fn_private_data,
int (*dst_host_filter_fn) (
void *private_data,
const unsigned char *outdata,
ssize_t outdata_len,
uint8_t tx_rx,
knet_node_id_t this_host_id,
knet_node_id_t src_node_id,
int8_t *channel,
knet_node_id_t *dst_host_ids,
size_t *dst_host_ids_entries))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data;
knet_h->dst_host_filter_fn = dst_host_filter_fn;
if (knet_h->dst_host_filter_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (enabled > 1) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->enabled = enabled;
if (enabled) {
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*interval = knet_h->pmtud_interval;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((!interval) || (interval > 86400)) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_interval = interval;
log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval);
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_enable_pmtud_notify(knet_handle_t knet_h,
void *pmtud_notify_fn_private_data,
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data;
knet_h->pmtud_notify_fn = pmtud_notify_fn;
if (knet_h->pmtud_notify_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_pmtud_get(knet_handle_t knet_h,
unsigned int *data_mtu)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!data_mtu) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*data_mtu = knet_h->data_mtu;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!knet_handle_crypto_cfg) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
crypto_fini(knet_h);
if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) ||
((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) &&
(!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) {
log_debug(knet_h, KNET_SUB_CRYPTO, "crypto is not enabled");
err = 0;
goto exit_unlock;
}
if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) {
log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short (min %d): %u",
KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) {
log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long (max %d): %u",
KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
err = crypto_init(knet_h, knet_handle_crypto_cfg);
if (err) {
err = -2;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_compress(knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!knet_handle_compress_cfg) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
compress_fini(knet_h, 0);
err = compress_cfg(knet_h, knet_handle_compress_cfg);
savederrno = errno;
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_in;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)buff;
iov_in.iov_len = buff_len;
err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_out[1];
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(iov_out, 0, sizeof(iov_out));
iov_out[0].iov_base = (void *)buff;
iov_out[0].iov_len = buff_len;
err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_get_stats(knet_handle_t knet_h, struct knet_handle_stats *stats, size_t struct_size)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!stats) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (struct_size > sizeof(struct knet_handle_stats)) {
struct_size = sizeof(struct knet_handle_stats);
}
memmove(stats, &knet_h->stats, struct_size);
/*
* TX crypt stats only count the data packets sent, so add in the ping/pong/pmtud figures
* RX is OK as it counts them before they are sorted.
*/
stats->tx_crypt_packets += knet_h->stats_extra.tx_crypt_ping_packets +
knet_h->stats_extra.tx_crypt_pong_packets +
knet_h->stats_extra.tx_crypt_pmtu_packets +
knet_h->stats_extra.tx_crypt_pmtu_reply_packets;
/* Tell the caller our full size in case they have an old version */
stats->size = sizeof(struct knet_handle_stats);
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_handle_clear_stats(knet_handle_t knet_h, int clear_option)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (clear_option != KNET_CLEARSTATS_HANDLE_ONLY &&
clear_option != KNET_CLEARSTATS_HANDLE_AND_LINK) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
memset(&knet_h->stats, 0, sizeof(struct knet_handle_stats));
memset(&knet_h->stats_extra, 0, sizeof(struct knet_handle_stats_extra));
if (clear_option == KNET_CLEARSTATS_HANDLE_AND_LINK) {
_link_clear_stats(knet_h);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
diff --git a/libknet/host.c b/libknet/host.c
index 7e597570..241723f9 100644
--- a/libknet/host.c
+++ b/libknet/host.c
@@ -1,710 +1,711 @@
/*
* Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include "host.h"
#include "internals.h"
#include "logging.h"
+#include "threads_common.h"
static void _host_list_update(knet_handle_t knet_h)
{
struct knet_host *host;
knet_h->host_ids_entries = 0;
for (host = knet_h->host_head; host != NULL; host = host->next) {
knet_h->host_ids[knet_h->host_ids_entries] = host->host_id;
knet_h->host_ids_entries++;
}
}
int knet_host_add(knet_handle_t knet_h, knet_node_id_t host_id)
{
int savederrno = 0, err = 0;
struct knet_host *host = NULL;
uint8_t link_idx;
if (!knet_h) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (knet_h->host_index[host_id]) {
err = -1;
savederrno = EEXIST;
log_err(knet_h, KNET_SUB_HOST, "Unable to add host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
host = malloc(sizeof(struct knet_host));
if (!host) {
err = -1;
savederrno = errno;
log_err(knet_h, KNET_SUB_HOST, "Unable to allocate memory for host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
memset(host, 0, sizeof(struct knet_host));
/*
* set host_id
*/
host->host_id = host_id;
/*
* set default host->name to host_id for logging
*/
snprintf(host->name, KNET_MAX_HOST_LEN - 1, "%u", host_id);
/*
* initialize links internal data
*/
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
host->link[link_idx].link_id = link_idx;
host->link[link_idx].status.stats.latency_min = UINT32_MAX;
}
/*
* add new host to the index
*/
knet_h->host_index[host_id] = host;
/*
* add new host to host list
*/
if (knet_h->host_head) {
host->next = knet_h->host_head;
}
knet_h->host_head = host;
_host_list_update(knet_h);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
if (err < 0) {
free(host);
}
errno = savederrno;
return err;
}
int knet_host_remove(knet_handle_t knet_h, knet_node_id_t host_id)
{
int savederrno = 0, err = 0;
struct knet_host *host, *removed;
uint8_t link_idx;
if (!knet_h) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
/*
* if links are configured we cannot release the host
*/
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (host->link[link_idx].configured) {
err = -1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_HOST, "Unable to remove host %u, links are still configured: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
}
removed = NULL;
/*
* removing host from list
*/
if (knet_h->host_head->host_id == host_id) {
removed = knet_h->host_head;
knet_h->host_head = removed->next;
} else {
for (host = knet_h->host_head; host->next != NULL; host = host->next) {
if (host->next->host_id == host_id) {
removed = host->next;
host->next = removed->next;
break;
}
}
}
knet_h->host_index[host_id] = NULL;
free(removed);
_host_list_update(knet_h);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_host_set_name(knet_handle_t knet_h, knet_node_id_t host_id, const char *name)
{
int savederrno = 0, err = 0;
struct knet_host *host;
if (!knet_h) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->host_index[host_id]) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u to set name: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
if (!name) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
if (strlen(name) >= KNET_MAX_HOST_LEN) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Requested name for host %u is too long: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
for (host = knet_h->host_head; host != NULL; host = host->next) {
if (!strncmp(host->name, name, KNET_MAX_HOST_LEN - 1)) {
err = -1;
savederrno = EEXIST;
log_err(knet_h, KNET_SUB_HOST, "Duplicated name found on host_id %u",
host->host_id);
goto exit_unlock;
}
}
snprintf(knet_h->host_index[host_id]->name, KNET_MAX_HOST_LEN - 1, "%s", name);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_host_get_name_by_host_id(knet_handle_t knet_h, knet_node_id_t host_id,
char *name)
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!name) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->host_index[host_id]) {
savederrno = EINVAL;
err = -1;
log_debug(knet_h, KNET_SUB_HOST, "Host %u not found", host_id);
goto exit_unlock;
}
snprintf(name, KNET_MAX_HOST_LEN, "%s", knet_h->host_index[host_id]->name);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_host_get_id_by_host_name(knet_handle_t knet_h, const char *name,
knet_node_id_t *host_id)
{
int savederrno = 0, err = 0, found = 0;
struct knet_host *host;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!name) {
errno = EINVAL;
return -1;
}
if (!host_id) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
for (host = knet_h->host_head; host != NULL; host = host->next) {
if (!strncmp(name, host->name, KNET_MAX_HOST_LEN)) {
found = 1;
*host_id = host->host_id;
break;
}
}
if (!found) {
savederrno = ENOENT;
err = -1;
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_host_get_host_list(knet_handle_t knet_h,
knet_node_id_t *host_ids, size_t *host_ids_entries)
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((!host_ids) || (!host_ids_entries)) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
memmove(host_ids, knet_h->host_ids, sizeof(knet_h->host_ids));
*host_ids_entries = knet_h->host_ids_entries;
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_host_set_policy(knet_handle_t knet_h, knet_node_id_t host_id,
uint8_t policy)
{
int savederrno = 0, err = 0;
uint8_t old_policy;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (policy > KNET_LINK_POLICY_RR) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->host_index[host_id]) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to set name for host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
old_policy = knet_h->host_index[host_id]->link_handler_policy;
knet_h->host_index[host_id]->link_handler_policy = policy;
if (_host_dstcache_update_async(knet_h, knet_h->host_index[host_id])) {
savederrno = errno;
err = -1;
knet_h->host_index[host_id]->link_handler_policy = old_policy;
log_debug(knet_h, KNET_SUB_HOST, "Unable to update switch cache for host %u: %s",
host_id, strerror(savederrno));
}
log_debug(knet_h, KNET_SUB_HOST, "Host %u has new switching policy: %u", host_id, policy);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_host_get_policy(knet_handle_t knet_h, knet_node_id_t host_id,
uint8_t *policy)
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!policy) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->host_index[host_id]) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to get name for host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
*policy = knet_h->host_index[host_id]->link_handler_policy;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_host_get_status(knet_handle_t knet_h, knet_node_id_t host_id,
struct knet_host_status *status)
{
int savederrno = 0, err = 0;
struct knet_host *host;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!status) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_HOST, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
memmove(status, &host->status, sizeof(struct knet_host_status));
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_host_enable_status_change_notify(knet_handle_t knet_h,
void *host_status_change_notify_fn_private_data,
void (*host_status_change_notify_fn) (
void *private_data,
knet_node_id_t host_id,
uint8_t reachable,
uint8_t remote,
uint8_t external))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HOST, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->host_status_change_notify_fn_private_data = host_status_change_notify_fn_private_data;
knet_h->host_status_change_notify_fn = host_status_change_notify_fn;
if (knet_h->host_status_change_notify_fn) {
log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HOST, "host_status_change_notify_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int _send_host_info(knet_handle_t knet_h, const void *data, const size_t datalen)
{
ssize_t ret = 0;
if (knet_h->fini_in_progress) {
return 0;
}
ret = sendto(knet_h->hostsockfd[1], data, datalen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
if (ret < 0) {
log_debug(knet_h, KNET_SUB_HOST, "Unable to write data to hostpipe. Error: %s", strerror(errno));
return -1;
}
if ((size_t)ret != datalen) {
log_debug(knet_h, KNET_SUB_HOST, "Unable to write all data to hostpipe. Expected: %zu, Written: %zd.", datalen, ret);
return -1;
}
return 0;
}
static void _clear_cbuffers(struct knet_host *host, seq_num_t rx_seq_num)
{
int i;
memset(host->circular_buffer, 0, KNET_CBUFFER_SIZE);
host->rx_seq_num = rx_seq_num;
memset(host->circular_buffer_defrag, 0, KNET_CBUFFER_SIZE);
for (i = 0; i < KNET_MAX_LINK; i++) {
memset(&host->defrag_buf[i], 0, sizeof(struct knet_host_defrag_buf));
}
}
/*
* check if a given packet seq num is in the circular buffers
* defrag_buf = 0 -> use normal cbuf 1 -> use the defrag buffer lookup
*/
int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf)
{
size_t i, j; /* circular buffer indexes */
seq_num_t seq_dist;
char *dst_cbuf = host->circular_buffer;
char *dst_cbuf_defrag = host->circular_buffer_defrag;
seq_num_t *dst_seq_num = &host->rx_seq_num;
if (clear_buf) {
_clear_cbuffers(host, seq_num);
}
if (seq_num < *dst_seq_num) {
seq_dist = (SEQ_MAX - seq_num) + *dst_seq_num;
} else {
seq_dist = *dst_seq_num - seq_num;
}
j = seq_num % KNET_CBUFFER_SIZE;
if (seq_dist < KNET_CBUFFER_SIZE) { /* seq num is in ring buffer */
if (!defrag_buf) {
return (dst_cbuf[j] == 0) ? 1 : 0;
} else {
return (dst_cbuf_defrag[j] == 0) ? 1 : 0;
}
} else if (seq_dist <= SEQ_MAX - KNET_CBUFFER_SIZE) {
memset(dst_cbuf, 0, KNET_CBUFFER_SIZE);
memset(dst_cbuf_defrag, 0, KNET_CBUFFER_SIZE);
*dst_seq_num = seq_num;
}
/* cleaning up circular buffer */
i = (*dst_seq_num + 1) % KNET_CBUFFER_SIZE;
if (i > j) {
memset(dst_cbuf + i, 0, KNET_CBUFFER_SIZE - i);
memset(dst_cbuf, 0, j + 1);
memset(dst_cbuf_defrag + i, 0, KNET_CBUFFER_SIZE - i);
memset(dst_cbuf_defrag, 0, j + 1);
} else {
memset(dst_cbuf + i, 0, j - i + 1);
memset(dst_cbuf_defrag + i, 0, j - i + 1);
}
*dst_seq_num = seq_num;
return 1;
}
void _seq_num_set(struct knet_host *host, seq_num_t seq_num, int defrag_buf)
{
if (!defrag_buf) {
host->circular_buffer[seq_num % KNET_CBUFFER_SIZE] = 1;
} else {
host->circular_buffer_defrag[seq_num % KNET_CBUFFER_SIZE] = 1;
}
return;
}
int _host_dstcache_update_async(knet_handle_t knet_h, struct knet_host *host)
{
int savederrno = 0;
knet_node_id_t host_id = host->host_id;
if (sendto(knet_h->dstsockfd[1], &host_id, sizeof(host_id), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(host_id)) {
savederrno = errno;
log_debug(knet_h, KNET_SUB_HOST, "Unable to write to dstpipefd[1]: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
return 0;
}
int _host_dstcache_update_sync(knet_handle_t knet_h, struct knet_host *host)
{
int link_idx;
int best_priority = -1;
int reachable = 0;
if (knet_h->host_id == host->host_id && knet_h->has_loop_link) {
host->active_link_entries = 1;
return 0;
}
host->active_link_entries = 0;
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (host->link[link_idx].status.enabled != 1) /* link is not enabled */
continue;
if (host->link[link_idx].status.connected != 1) /* link is not enabled */
continue;
if (host->link[link_idx].has_valid_mtu != 1) /* link does not have valid MTU */
continue;
if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) {
/* for passive we look for the only active link with higher priority */
if (host->link[link_idx].priority > best_priority) {
host->active_links[0] = link_idx;
best_priority = host->link[link_idx].priority;
}
host->active_link_entries = 1;
} else {
/* for RR and ACTIVE we need to copy all available links */
host->active_links[host->active_link_entries] = link_idx;
host->active_link_entries++;
}
}
if (host->link_handler_policy == KNET_LINK_POLICY_PASSIVE) {
log_debug(knet_h, KNET_SUB_HOST, "host: %u (passive) best link: %u (pri: %u)",
host->host_id, host->link[host->active_links[0]].link_id,
host->link[host->active_links[0]].priority);
} else {
log_debug(knet_h, KNET_SUB_HOST, "host: %u has %u active links",
host->host_id, host->active_link_entries);
}
/* no active links, we can clean the circular buffers and indexes */
if (!host->active_link_entries) {
log_warn(knet_h, KNET_SUB_HOST, "host: %u has no active links", host->host_id);
_clear_cbuffers(host, 0);
} else {
reachable = 1;
}
if (host->status.reachable != reachable) {
host->status.reachable = reachable;
if (knet_h->host_status_change_notify_fn) {
knet_h->host_status_change_notify_fn(
knet_h->host_status_change_notify_fn_private_data,
host->host_id,
host->status.reachable,
host->status.remote,
host->status.external);
}
}
return 0;
}
diff --git a/libknet/internals.h b/libknet/internals.h
index 622e0eec..876a5ad4 100644
--- a/libknet/internals.h
+++ b/libknet/internals.h
@@ -1,500 +1,502 @@
/*
* Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#ifndef __KNET_INTERNALS_H__
#define __KNET_INTERNALS_H__
/*
* NOTE: you shouldn't need to include this header normally
*/
#include <pthread.h>
#include "libknet.h"
#include "onwire.h"
#include "compat.h"
#define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE
#define KNET_DATABUFSIZE_CRYPT_PAD 1024
#define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD
#define KNET_DATABUFSIZE_COMPRESS_PAD 1024
#define KNET_DATABUFSIZE_COMPRESS KNET_DATABUFSIZE + KNET_DATABUFSIZE_COMPRESS_PAD
#define KNET_RING_RCVBUFF 8388608
#define PCKT_FRAG_MAX UINT8_MAX
#define PCKT_RX_BUFS 512
#define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX
typedef void *knet_transport_link_t; /* per link transport handle */
typedef void *knet_transport_t; /* per knet_h transport handle */
struct knet_transport_ops; /* Forward because of circular dependancy */
struct knet_mmsghdr {
struct msghdr msg_hdr; /* Message header */
unsigned int msg_len; /* Number of bytes transmitted */
};
struct knet_link {
/* required */
struct sockaddr_storage src_addr;
struct sockaddr_storage dst_addr;
/* configurable */
unsigned int dynamic; /* see KNET_LINK_DYN_ define above */
uint8_t priority; /* higher priority == preferred for A/P */
unsigned long long ping_interval; /* interval */
unsigned long long pong_timeout; /* timeout */
unsigned long long pong_timeout_adj; /* timeout adjusted for latency */
uint8_t pong_timeout_backoff; /* see link.h for definition */
unsigned int latency_fix; /* precision */
uint8_t pong_count; /* how many ping/pong to send/receive before link is up */
uint64_t flags;
/* status */
struct knet_link_status status;
/* internals */
uint8_t link_id;
uint8_t transport_type; /* #defined constant from API */
knet_transport_link_t transport_link; /* link_info_t from transport */
int outsock;
unsigned int configured:1; /* set to 1 if src/dst have been configured transport initialized on this link*/
unsigned int transport_connected:1; /* set to 1 if lower level transport is connected */
unsigned int latency_exp;
uint8_t received_pong;
struct timespec ping_last;
/* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */
uint32_t proto_overhead;
struct timespec pmtud_last;
uint32_t last_ping_size;
uint32_t last_good_mtu;
uint32_t last_bad_mtu;
uint32_t last_sent_mtu;
uint32_t last_recv_mtu;
uint8_t has_valid_mtu;
};
#define KNET_CBUFFER_SIZE 4096
struct knet_host_defrag_buf {
char buf[KNET_DATABUFSIZE];
uint8_t in_use; /* 0 buffer is free, 1 is in use */
seq_num_t pckt_seq; /* identify the pckt we are receiving */
uint8_t frag_recv; /* how many frags did we receive */
uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */
uint8_t last_first; /* special case if we receive the last fragment first */
uint16_t frag_size; /* normal frag size (not the last one) */
uint16_t last_frag_size; /* the last fragment might not be aligned with MTU size */
struct timespec last_update; /* keep time of the last pckt */
};
struct knet_host {
/* required */
knet_node_id_t host_id;
/* configurable */
uint8_t link_handler_policy;
char name[KNET_MAX_HOST_LEN];
/* status */
struct knet_host_status status;
/* internals */
char circular_buffer[KNET_CBUFFER_SIZE];
seq_num_t rx_seq_num;
seq_num_t untimed_rx_seq_num;
seq_num_t timed_rx_seq_num;
uint8_t got_data;
/* defrag/reassembly buffers */
struct knet_host_defrag_buf defrag_buf[KNET_MAX_LINK];
char circular_buffer_defrag[KNET_CBUFFER_SIZE];
/* link stuff */
struct knet_link link[KNET_MAX_LINK];
uint8_t active_link_entries;
uint8_t active_links[KNET_MAX_LINK];
struct knet_host *next;
};
struct knet_sock {
int sockfd[2]; /* sockfd[0] will always be application facing
* and sockfd[1] internal if sockpair has been created by knet */
int is_socket; /* check if it's a socket for recvmmsg usage */
int is_created; /* knet created this socket and has to clean up on exit/del */
int in_use; /* set to 1 if it's use, 0 if free */
int has_error; /* set to 1 if there were errors reading from the sock
* and socket has been removed from epoll */
};
struct knet_fd_trackers {
uint8_t transport; /* transport type (UDP/SCTP...) */
uint8_t data_type; /* internal use for transport to define what data are associated
* to this fd */
void *data; /* pointer to the data */
};
#define KNET_MAX_FDS KNET_MAX_HOST * KNET_MAX_LINK * 4
#define KNET_MAX_COMPRESS_METHODS UINT8_MAX
struct knet_handle_stats_extra {
uint64_t tx_crypt_pmtu_packets;
uint64_t tx_crypt_pmtu_reply_packets;
uint64_t tx_crypt_ping_packets;
uint64_t tx_crypt_pong_packets;
};
struct knet_handle {
knet_node_id_t host_id;
unsigned int enabled:1;
struct knet_sock sockfd[KNET_DATAFD_MAX];
int logfd;
uint8_t log_levels[KNET_MAX_SUBSYSTEMS];
int hostsockfd[2];
int dstsockfd[2];
int send_to_links_epollfd;
int recv_from_links_epollfd;
int dst_link_handler_epollfd;
unsigned int pmtud_interval;
unsigned int data_mtu; /* contains the max data size that we can send onwire
* without frags */
struct knet_host *host_head;
struct knet_host *host_index[KNET_MAX_HOST];
knet_transport_t transports[KNET_MAX_TRANSPORTS+1];
struct knet_fd_trackers knet_transport_fd_tracker[KNET_MAX_FDS]; /* track status for each fd handled by transports */
struct knet_handle_stats stats;
struct knet_handle_stats_extra stats_extra;
uint32_t reconnect_int;
knet_node_id_t host_ids[KNET_MAX_HOST];
size_t host_ids_entries;
struct knet_header *recv_from_sock_buf;
struct knet_header *send_to_links_buf[PCKT_FRAG_MAX];
struct knet_header *recv_from_links_buf[PCKT_RX_BUFS];
struct knet_header *pingbuf;
struct knet_header *pmtudbuf;
pthread_t send_to_links_thread;
pthread_t recv_from_links_thread;
pthread_t heartbt_thread;
pthread_t dst_link_handler_thread;
pthread_t pmtud_link_handler_thread;
int lock_init_done;
pthread_rwlock_t global_rwlock; /* global config lock */
pthread_mutex_t pmtud_mutex; /* pmtud mutex to handle conditional send/recv + timeout */
pthread_cond_t pmtud_cond; /* conditional for above */
pthread_mutex_t tx_mutex; /* used to protect knet_send_sync and TX thread */
pthread_mutex_t hb_mutex; /* used to protect heartbeat thread and seq_num broadcasting */
pthread_mutex_t backoff_mutex; /* used to protect dst_link->pong_timeout_adj */
+ int pmtud_running;
+ int pmtud_abort;
struct crypto_instance *crypto_instance;
size_t sec_header_size;
size_t sec_block_size;
size_t sec_hash_size;
size_t sec_salt_size;
unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX];
unsigned char *recv_from_links_buf_crypt;
unsigned char *recv_from_links_buf_decrypt;
unsigned char *pingbuf_crypt;
unsigned char *pmtudbuf_crypt;
int compress_model;
int compress_level;
size_t compress_threshold;
void *compress_int_data[KNET_MAX_COMPRESS_METHODS]; /* for compress method private data */
unsigned char *recv_from_links_buf_decompress;
unsigned char *send_to_links_buf_compress;
seq_num_t tx_seq_num;
pthread_mutex_t tx_seq_num_mutex;
uint8_t has_loop_link;
uint8_t loop_link;
void *dst_host_filter_fn_private_data;
int (*dst_host_filter_fn) (
void *private_data,
const unsigned char *outdata,
ssize_t outdata_len,
uint8_t tx_rx,
knet_node_id_t this_host_id,
knet_node_id_t src_node_id,
int8_t *channel,
knet_node_id_t *dst_host_ids,
size_t *dst_host_ids_entries);
void *pmtud_notify_fn_private_data;
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu);
void *host_status_change_notify_fn_private_data;
void (*host_status_change_notify_fn) (
void *private_data,
knet_node_id_t host_id,
uint8_t reachable,
uint8_t remote,
uint8_t external);
void *sock_notify_fn_private_data;
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno);
int fini_in_progress;
};
extern pthread_rwlock_t shlib_rwlock; /* global shared lib load lock */
/*
* NOTE: every single operation must be implementend
* for every protocol.
*/
typedef struct knet_transport_ops {
/*
* transport generic information
*/
const char *transport_name;
const uint8_t transport_id;
const uint8_t built_in;
uint32_t transport_mtu_overhead;
/*
* transport init must allocate the new transport
* and perform all internal initializations
* (threads, lists, etc).
*/
int (*transport_init)(knet_handle_t knet_h);
/*
* transport free must releases _all_ resources
* allocated by tranport_init
*/
int (*transport_free)(knet_handle_t knet_h);
/*
* link operations should take care of all the
* sockets and epoll management for a given link/transport set
* transport_link_disable should return err = -1 and errno = EBUSY
* if listener is still in use, and any other errno in case
* the link cannot be disabled.
*
* set_config/clear_config are invoked in global write lock context
*/
int (*transport_link_set_config)(knet_handle_t knet_h, struct knet_link *link);
int (*transport_link_clear_config)(knet_handle_t knet_h, struct knet_link *link);
/*
* transport callback for incoming dynamic connections
* this is called in global read lock context
*/
int (*transport_link_dyn_connect)(knet_handle_t knet_h, int sockfd, struct knet_link *link);
/*
* per transport error handling of recvmmsg
* (see _handle_recv_from_links comments for details)
*/
/*
* transport_rx_sock_error is invoked when recvmmsg returns <= 0
*
* transport_rx_sock_error is invoked with both global_rdlock
*/
int (*transport_rx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno);
/*
* transport_tx_sock_error is invoked with global_rwlock and
* it's invoked when sendto or sendmmsg returns =< 0
*
* it should return:
* -1 on internal error
* 0 ignore error and continue
* 1 retry
* any sleep or wait action should happen inside the transport code
*/
int (*transport_tx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno);
/*
* this function is called on _every_ received packet
* to verify if the packet is data or internal protocol error handling
*
* it should return:
* -1 on error
* 0 packet is not data and we should continue the packet process loop
* 1 packet is not data and we should STOP the packet process loop
* 2 packet is data and should be parsed as such
*
* transport_rx_is_data is invoked with both global_rwlock
* and fd_tracker read lock (from RX thread)
*/
int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg);
} knet_transport_ops_t;
socklen_t sockaddr_len(const struct sockaddr_storage *ss);
/**
* This is a kernel style list implementation.
*
* @author Steven Dake <sdake@redhat.com>
*/
struct knet_list_head {
struct knet_list_head *next;
struct knet_list_head *prev;
};
/**
* @def KNET_LIST_DECLARE()
* Declare and initialize a list head.
*/
#define KNET_LIST_DECLARE(name) \
struct knet_list_head name = { &(name), &(name) }
#define KNET_INIT_LIST_HEAD(ptr) do { \
(ptr)->next = (ptr); (ptr)->prev = (ptr); \
} while (0)
/**
* Initialize the list entry.
*
* Points next and prev pointers to head.
* @param head pointer to the list head
*/
static inline void knet_list_init(struct knet_list_head *head)
{
head->next = head;
head->prev = head;
}
/**
* Add this element to the list.
*
* @param element the new element to insert.
* @param head pointer to the list head
*/
static inline void knet_list_add(struct knet_list_head *element,
struct knet_list_head *head)
{
head->next->prev = element;
element->next = head->next;
element->prev = head;
head->next = element;
}
/**
* Add to the list (but at the end of the list).
*
* @param element pointer to the element to add
* @param head pointer to the list head
* @see knet_list_add()
*/
static inline void knet_list_add_tail(struct knet_list_head *element,
struct knet_list_head *head)
{
head->prev->next = element;
element->next = head;
element->prev = head->prev;
head->prev = element;
}
/**
* Delete an entry from the list.
*
* @param _remove the list item to remove
*/
static inline void knet_list_del(struct knet_list_head *_remove)
{
_remove->next->prev = _remove->prev;
_remove->prev->next = _remove->next;
}
/**
* Replace old entry by new one
* @param old: the element to be replaced
* @param new: the new element to insert
*/
static inline void knet_list_replace(struct knet_list_head *old,
struct knet_list_head *new)
{
new->next = old->next;
new->next->prev = new;
new->prev = old->prev;
new->prev->next = new;
}
/**
* Tests whether list is the last entry in list head
* @param list: the entry to test
* @param head: the head of the list
* @return boolean true/false
*/
static inline int knet_list_is_last(const struct knet_list_head *list,
const struct knet_list_head *head)
{
return list->next == head;
}
/**
* A quick test to see if the list is empty (pointing to it's self).
* @param head pointer to the list head
* @return boolean true/false
*/
static inline int32_t knet_list_empty(const struct knet_list_head *head)
{
return head->next == head;
}
/**
* Get the struct for this entry
* @param ptr: the &struct list_head pointer.
* @param type: the type of the struct this is embedded in.
* @param member: the name of the list_struct within the struct.
*/
#define knet_list_entry(ptr,type,member)\
((type *)((char *)(ptr)-(char*)(&((type *)0)->member)))
/**
* Get the first element from a list
* @param ptr: the &struct list_head pointer.
* @param type: the type of the struct this is embedded in.
* @param member: the name of the list_struct within the struct.
*/
#define knet_list_first_entry(ptr, type, member) \
knet_list_entry((ptr)->next, type, member)
/**
* Iterate over a list
* @param pos: the &struct list_head to use as a loop counter.
* @param head: the head for your list.
*/
#define knet_list_for_each(pos, head) \
for (pos = (head)->next; pos != (head); pos = pos->next)
/**
* Iterate over a list backwards
* @param pos: the &struct list_head to use as a loop counter.
* @param head: the head for your list.
*/
#define knet_list_for_each_reverse(pos, head) \
for (pos = (head)->prev; pos != (head); pos = pos->prev)
/**
* Iterate over a list safe against removal of list entry
* @param pos: the &struct list_head to use as a loop counter.
* @param n: another &struct list_head to use as temporary storage
* @param head: the head for your list.
*/
#define knet_list_for_each_safe(pos, n, head) \
for (pos = (head)->next, n = pos->next; pos != (head); \
pos = n, n = pos->next)
/**
* Iterate over list of given type
* @param pos: the type * to use as a loop counter.
* @param head: the head for your list.
* @param member: the name of the list_struct within the struct.
*/
#define knet_list_for_each_entry(pos, head, member) \
for (pos = knet_list_entry((head)->next, typeof(*pos), member); \
&pos->member != (head); \
pos = knet_list_entry(pos->member.next, typeof(*pos), member))
#endif
diff --git a/libknet/links.c b/libknet/links.c
index 94277e87..8cd121c2 100644
--- a/libknet/links.c
+++ b/libknet/links.c
@@ -1,1081 +1,1082 @@
/*
* Copyright (C) 2012-2017 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <netdb.h>
#include <string.h>
#include <pthread.h>
#include "internals.h"
#include "logging.h"
#include "links.h"
#include "transports.h"
#include "host.h"
+#include "threads_common.h"
int _link_updown(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int enabled, unsigned int connected)
{
struct knet_link *link = &knet_h->host_index[host_id]->link[link_id];
if ((link->status.enabled == enabled) &&
(link->status.connected == connected))
return 0;
link->status.enabled = enabled;
link->status.connected = connected;
_host_dstcache_update_async(knet_h, knet_h->host_index[host_id]);
if ((link->status.dynconnected) &&
(!link->status.connected))
link->status.dynconnected = 0;
if (connected) {
time(&link->status.stats.last_up_times[link->status.stats.last_up_time_index]);
link->status.stats.up_count++;
if (++link->status.stats.last_up_time_index > MAX_LINK_EVENTS) {
link->status.stats.last_up_time_index = 0;
}
} else {
time(&link->status.stats.last_down_times[link->status.stats.last_down_time_index]);
link->status.stats.down_count++;
if (++link->status.stats.last_down_time_index > MAX_LINK_EVENTS) {
link->status.stats.last_down_time_index = 0;
}
}
return 0;
}
void _link_clear_stats(knet_handle_t knet_h)
{
struct knet_host *host;
struct knet_link *link;
uint32_t host_id;
uint8_t link_id;
for (host_id = 0; host_id < KNET_MAX_HOST; host_id++) {
host = knet_h->host_index[host_id];
if (!host) {
continue;
}
for (link_id = 0; link_id < KNET_MAX_LINK; link_id++) {
link = &host->link[link_id];
memset(&link->status.stats, 0, sizeof(struct knet_link_stats));
}
}
}
int knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t transport,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr,
uint64_t flags)
{
int savederrno = 0, err = 0, i;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!src_addr) {
errno = EINVAL;
return -1;
}
if (transport >= KNET_MAX_TRANSPORTS) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id != host_id) {
log_err(knet_h, KNET_SUB_LINK, "Cannot create loopback link to remote node");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
if (knet_h->host_id == host_id && knet_h->has_loop_link) {
log_err(knet_h, KNET_SUB_LINK, "Cannot create more than 1 link when loopback is active");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id == host_id) {
for (i=0; i<KNET_MAX_LINK; i++) {
if (host->link[i].configured) {
log_err(knet_h, KNET_SUB_LINK, "Cannot add loopback link when other links are already configured.");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
}
}
link = &host->link[link_id];
if (link->configured != 0) {
err =-1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled != 0) {
err =-1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
memmove(&link->src_addr, src_addr, sizeof(struct sockaddr_storage));
err = knet_addrtostr(src_addr, sizeof(struct sockaddr_storage),
link->status.src_ipaddr, KNET_MAX_HOST_LEN,
link->status.src_port, KNET_MAX_PORT_LEN);
if (err) {
if (err == EAI_SYSTEM) {
savederrno = errno;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u source addr/port: %s",
host_id, link_id, strerror(savederrno));
} else {
savederrno = EINVAL;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u source addr/port: %s",
host_id, link_id, gai_strerror(err));
}
err = -1;
goto exit_unlock;
}
if (!dst_addr) {
link->dynamic = KNET_LINK_DYNIP;
} else {
link->dynamic = KNET_LINK_STATIC;
memmove(&link->dst_addr, dst_addr, sizeof(struct sockaddr_storage));
err = knet_addrtostr(dst_addr, sizeof(struct sockaddr_storage),
link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
link->status.dst_port, KNET_MAX_PORT_LEN);
if (err) {
if (err == EAI_SYSTEM) {
savederrno = errno;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u destination addr/port: %s",
host_id, link_id, strerror(savederrno));
} else {
savederrno = EINVAL;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u destination addr/port: %s",
host_id, link_id, gai_strerror(err));
}
err = -1;
goto exit_unlock;
}
}
link->pong_count = KNET_LINK_DEFAULT_PONG_COUNT;
link->has_valid_mtu = 0;
link->ping_interval = KNET_LINK_DEFAULT_PING_INTERVAL * 1000; /* microseconds */
link->pong_timeout = KNET_LINK_DEFAULT_PING_TIMEOUT * 1000; /* microseconds */
link->pong_timeout_backoff = KNET_LINK_PONG_TIMEOUT_BACKOFF;
link->pong_timeout_adj = link->pong_timeout * link->pong_timeout_backoff; /* microseconds */
link->latency_fix = KNET_LINK_DEFAULT_PING_PRECISION;
link->latency_exp = KNET_LINK_DEFAULT_PING_PRECISION - \
((link->ping_interval * KNET_LINK_DEFAULT_PING_PRECISION) / 8000000);
link->flags = flags;
if (transport_link_set_config(knet_h, link, transport) < 0) {
savederrno = errno;
err = -1;
goto exit_unlock;
}
link->configured = 1;
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is configured",
host_id, link_id);
if (transport == KNET_TRANSPORT_LOOPBACK) {
knet_h->has_loop_link = 1;
knet_h->loop_link = link_id;
host->status.reachable = 1;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *transport,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr,
uint8_t *dynamic,
uint64_t *flags)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!src_addr) {
errno = EINVAL;
return -1;
}
if (!dynamic) {
errno = EINVAL;
return -1;
}
if (!transport) {
errno = EINVAL;
return -1;
}
if (!flags) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if ((link->dynamic == KNET_LINK_STATIC) && (!dst_addr)) {
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
memmove(src_addr, &link->src_addr, sizeof(struct sockaddr_storage));
*transport = link->transport_type;
*flags = link->flags;
if (link->dynamic == KNET_LINK_STATIC) {
*dynamic = 0;
memmove(dst_addr, &link->dst_addr, sizeof(struct sockaddr_storage));
} else {
*dynamic = 1;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_clear_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (link->configured != 1) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled != 0) {
err = -1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if ((transport_link_clear_config(knet_h, link) < 0) &&
(errno != EBUSY)) {
savederrno = errno;
err = -1;
goto exit_unlock;
}
memset(link, 0, sizeof(struct knet_link));
link->link_id = link_id;
if (knet_h->has_loop_link && link_id == knet_h->loop_link) {
knet_h->has_loop_link = 0;
if (host->active_link_entries == 0) {
host->status.reachable = 0;
}
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u config has been wiped",
host_id, link_id);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_set_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int enabled)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (enabled > 1) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled == enabled) {
err = 0;
goto exit_unlock;
}
err = _link_updown(knet_h, host_id, link_id, enabled, link->status.connected);
savederrno = errno;
if (enabled) {
goto exit_unlock;
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is disabled",
host_id, link_id);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int *enabled)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!enabled) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*enabled = link->status.enabled;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_set_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t pong_count)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (pong_count < 1) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
link->pong_count = pong_count;
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u pong count update: %u",
host_id, link_id, link->pong_count);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *pong_count)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!pong_count) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*pong_count = link->pong_count;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_set_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
time_t interval, time_t timeout, unsigned int precision)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
if (!timeout) {
errno = ENOSYS;
return -1;
}
if (!precision) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
link->ping_interval = interval * 1000; /* microseconds */
link->pong_timeout = timeout * 1000; /* microseconds */
link->latency_fix = precision;
link->latency_exp = precision - \
((link->ping_interval * precision) / 8000000);
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u timeout update - interval: %llu timeout: %llu precision: %u",
host_id, link_id, link->ping_interval, link->pong_timeout, precision);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
time_t *interval, time_t *timeout, unsigned int *precision)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
if (!timeout) {
errno = EINVAL;
return -1;
}
if (!precision) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*interval = link->ping_interval / 1000; /* microseconds */
*timeout = link->pong_timeout / 1000;
*precision = link->latency_fix;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_set_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t priority)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
uint8_t old_priority;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
old_priority = link->priority;
if (link->priority == priority) {
err = 0;
goto exit_unlock;
}
link->priority = priority;
if (_host_dstcache_update_sync(knet_h, host)) {
savederrno = errno;
log_debug(knet_h, KNET_SUB_LINK,
"Unable to update link priority (host: %u link: %u priority: %u): %s",
host_id, link_id, link->priority, strerror(savederrno));
link->priority = old_priority;
err = -1;
goto exit_unlock;
}
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u priority set to: %u",
host_id, link_id, link->priority);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *priority)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!priority) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*priority = link->priority;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_link_list(knet_handle_t knet_h, knet_node_id_t host_id,
uint8_t *link_ids, size_t *link_ids_entries)
{
int savederrno = 0, err = 0, i, count = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!link_ids) {
errno = EINVAL;
return -1;
}
if (!link_ids_entries) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
for (i = 0; i < KNET_MAX_LINK; i++) {
link = &host->link[i];
if (!link->configured) {
continue;
}
link_ids[count] = i;
count++;
}
*link_ids_entries = count;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_get_status(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
struct knet_link_status *status, size_t struct_size)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!status) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
memmove(status, &link->status, struct_size);
/* Calculate totals - no point in doing this on-the-fly */
status->stats.rx_total_packets =
status->stats.rx_data_packets +
status->stats.rx_ping_packets +
status->stats.rx_pong_packets +
status->stats.rx_pmtu_packets;
status->stats.tx_total_packets =
status->stats.tx_data_packets +
status->stats.tx_ping_packets +
status->stats.tx_pong_packets +
status->stats.tx_pmtu_packets;
status->stats.rx_total_bytes =
status->stats.rx_data_bytes +
status->stats.rx_ping_bytes +
status->stats.rx_pong_bytes +
status->stats.rx_pmtu_bytes;
status->stats.tx_total_bytes =
status->stats.tx_data_bytes +
status->stats.tx_ping_bytes +
status->stats.tx_pong_bytes +
status->stats.tx_pmtu_bytes;
status->stats.tx_total_errors =
status->stats.tx_data_errors +
status->stats.tx_ping_errors +
status->stats.tx_pong_errors +
status->stats.tx_pmtu_errors;
status->stats.tx_total_retries =
status->stats.tx_data_retries +
status->stats.tx_ping_retries +
status->stats.tx_pong_retries +
status->stats.tx_pmtu_retries;
/* Tell the caller our full size in case they have an old version */
status->size = sizeof(struct knet_link_status);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
diff --git a/libknet/logging.c b/libknet/logging.c
index ea1bd364..bffd470d 100644
--- a/libknet/logging.c
+++ b/libknet/logging.c
@@ -1,263 +1,264 @@
/*
* Copyright (C) 2010-2017 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <strings.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <stdarg.h>
#include <errno.h>
#include <stdio.h>
#include "internals.h"
#include "logging.h"
+#include "threads_common.h"
struct pretty_names {
const char *name;
uint8_t val;
};
static struct pretty_names subsystem_names[] =
{
{ "common", KNET_SUB_COMMON },
{ "handle", KNET_SUB_HANDLE },
{ "host", KNET_SUB_HOST },
{ "listener", KNET_SUB_LISTENER },
{ "link", KNET_SUB_LINK },
{ "transport", KNET_SUB_TRANSPORT },
{ "crypto", KNET_SUB_CRYPTO },
{ "compress", KNET_SUB_COMPRESS },
{ "filter", KNET_SUB_FILTER },
{ "dstcache", KNET_SUB_DSTCACHE },
{ "heartbeat", KNET_SUB_HEARTBEAT },
{ "pmtud", KNET_SUB_PMTUD },
{ "tx", KNET_SUB_TX },
{ "rx", KNET_SUB_RX },
{ "loopback", KNET_SUB_TRANSP_LOOPBACK },
{ "udp", KNET_SUB_TRANSP_UDP },
{ "sctp", KNET_SUB_TRANSP_SCTP },
{ "nsscrypto", KNET_SUB_NSSCRYPTO },
{ "opensslcrypto", KNET_SUB_OPENSSLCRYPTO },
{ "zlibcomp", KNET_SUB_ZLIBCOMP },
{ "lz4comp", KNET_SUB_LZ4COMP },
{ "lz4hccomp", KNET_SUB_LZ4HCCOMP },
{ "lzo2comp", KNET_SUB_LZO2COMP },
{ "lzmacomp", KNET_SUB_LZMACOMP },
{ "bzip2comp", KNET_SUB_BZIP2COMP },
{ "unknown", KNET_SUB_UNKNOWN } /* unknown MUST always be last in this array */
};
const char *knet_log_get_subsystem_name(uint8_t subsystem)
{
unsigned int i;
for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) {
if (subsystem_names[i].val == KNET_SUB_UNKNOWN) {
break;
}
if (subsystem_names[i].val == subsystem) {
return subsystem_names[i].name;
}
}
return "unknown";
}
uint8_t knet_log_get_subsystem_id(const char *name)
{
unsigned int i;
for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) {
if (subsystem_names[i].val == KNET_SUB_UNKNOWN) {
break;
}
if (strcasecmp(name, subsystem_names[i].name) == 0) {
return subsystem_names[i].val;
}
}
return KNET_SUB_UNKNOWN;
}
static int is_valid_subsystem(uint8_t subsystem)
{
unsigned int i;
for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) {
if ((subsystem != KNET_SUB_UNKNOWN) &&
(subsystem_names[i].val == KNET_SUB_UNKNOWN)) {
break;
}
if (subsystem_names[i].val == subsystem) {
return 0;
}
}
return -1;
}
static struct pretty_names loglevel_names[] =
{
{ "ERROR", KNET_LOG_ERR },
{ "WARNING", KNET_LOG_WARN },
{ "info", KNET_LOG_INFO },
{ "debug", KNET_LOG_DEBUG }
};
const char *knet_log_get_loglevel_name(uint8_t level)
{
unsigned int i;
for (i = 0; i <= KNET_LOG_DEBUG; i++) {
if (loglevel_names[i].val == level) {
return loglevel_names[i].name;
}
}
return "ERROR";
}
uint8_t knet_log_get_loglevel_id(const char *name)
{
unsigned int i;
for (i = 0; i <= KNET_LOG_DEBUG; i++) {
if (strcasecmp(name, loglevel_names[i].name) == 0) {
return loglevel_names[i].val;
}
}
return KNET_LOG_ERR;
}
int knet_log_set_loglevel(knet_handle_t knet_h, uint8_t subsystem,
uint8_t level)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (is_valid_subsystem(subsystem) < 0) {
errno = EINVAL;
return -1;
}
if (level > KNET_LOG_DEBUG) {
errno = EINVAL;
return -1;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, subsystem, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->log_levels[subsystem] = level;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_log_get_loglevel(knet_handle_t knet_h, uint8_t subsystem,
uint8_t *level)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (is_valid_subsystem(subsystem) < 0) {
errno = EINVAL;
return -1;
}
if (!level) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, subsystem, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*level = knet_h->log_levels[subsystem];
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
void log_msg(knet_handle_t knet_h, uint8_t subsystem, uint8_t msglevel,
const char *fmt, ...)
{
va_list ap;
struct knet_log_msg msg;
size_t byte_cnt = 0;
int len, err;
if ((!knet_h) ||
(subsystem == KNET_MAX_SUBSYSTEMS) ||
(msglevel > knet_h->log_levels[subsystem]))
return;
/*
* most logging calls will take place with locking in place.
* if we get an EINVAL and locking is initialized, then
* we are getting a real error and we need to stop
*/
err = pthread_rwlock_tryrdlock(&knet_h->global_rwlock);
if ((err == EAGAIN) && (knet_h->lock_init_done))
return;
if (knet_h->logfd <= 0)
goto out_unlock;
memset(&msg, 0, sizeof(struct knet_log_msg));
msg.subsystem = subsystem;
msg.msglevel = msglevel;
va_start(ap, fmt);
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wformat-nonliteral"
#endif
vsnprintf(msg.msg, sizeof(msg.msg) - 2, fmt, ap);
#ifdef __clang__
#pragma clang diagnostic pop
#endif
va_end(ap);
len = strlen(msg.msg);
msg.msg[len+1] = '\n';
while (byte_cnt < sizeof(struct knet_log_msg)) {
len = write(knet_h->logfd, &msg, sizeof(struct knet_log_msg) - byte_cnt);
if (len <= 0) {
goto out_unlock;
}
byte_cnt += len;
}
out_unlock:
/*
* unlock only if we are holding the lock
*/
if (!err)
pthread_rwlock_unlock(&knet_h->global_rwlock);
return;
}
diff --git a/libknet/tests/Makefile.am b/libknet/tests/Makefile.am
index dc00351a..8afac09d 100644
--- a/libknet/tests/Makefile.am
+++ b/libknet/tests/Makefile.am
@@ -1,69 +1,70 @@
#
# Copyright (C) 2016-2017 Red Hat, Inc. All rights reserved.
#
# Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
#
# This software licensed under GPL-2.0+, LGPL-2.0+
#
MAINTAINERCLEANFILES = Makefile.in
include $(top_srcdir)/build-aux/check.mk
include $(top_srcdir)/libknet/tests/api-check.mk
EXTRA_DIST = \
api-test-coverage \
api-check.mk
AM_CPPFLAGS = -I$(top_srcdir)/libknet
AM_CFLAGS = $(PTHREAD_CFLAGS)
LIBS = $(top_builddir)/libknet/libknet.la \
$(PTHREAD_LIBS) $(dl_LIBS)
noinst_HEADERS = \
test-common.h
# the order of those tests is NOT random.
# some functions can only be tested properly after some dependents
# API have been validated upfront.
check_PROGRAMS = \
$(api_checks) \
$(int_checks) \
$(fun_checks)
int_checks = \
int_timediff_test
fun_checks =
benchmarks = \
knet_bench_test
noinst_PROGRAMS = \
api_knet_handle_new_limit_test \
pckt_test \
$(benchmarks) \
$(check_PROGRAMS)
noinst_SCRIPTS = \
api-test-coverage
TESTS = $(check_PROGRAMS)
check-local: check-api-test-coverage
check-api-test-coverage:
chmod u+x $(top_srcdir)/libknet/tests/api-test-coverage
$(top_srcdir)/libknet/tests/api-test-coverage $(top_srcdir) $(top_builddir)
pckt_test_SOURCES = pckt_test.c
int_timediff_test_SOURCES = int_timediff.c
knet_bench_test_SOURCES = knet_bench.c \
test-common.c \
../common.c \
../logging.c \
../compat.c \
- ../transport_common.c
+ ../transport_common.c \
+ ../threads_common.c
diff --git a/libknet/threads_common.c b/libknet/threads_common.c
index f40d68b5..9781f052 100644
--- a/libknet/threads_common.c
+++ b/libknet/threads_common.c
@@ -1,38 +1,63 @@
/*
* Copyright (C) 2016-2017 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <pthread.h>
#include <errno.h>
#include <string.h>
#include "internals.h"
#include "logging.h"
#include "threads_common.h"
int shutdown_in_progress(knet_handle_t knet_h)
{
int savederrno = 0;
int ret;
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMMON, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
ret = knet_h->fini_in_progress;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return ret;
}
+
+static int pmtud_reschedule(knet_handle_t knet_h)
+{
+ if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
+ log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
+ return -1;
+ }
+
+ knet_h->pmtud_abort = 1;
+
+ if (knet_h->pmtud_running) {
+ pthread_cond_signal(&knet_h->pmtud_cond);
+ }
+
+ pthread_mutex_unlock(&knet_h->pmtud_mutex);
+ return 0;
+}
+
+int get_global_wrlock(knet_handle_t knet_h)
+{
+ if (pmtud_reschedule(knet_h) < 0) {
+ log_info(knet_h, KNET_SUB_PMTUD, "Unable to notify PMTUd to reschedule. Expect delays in executing API calls");
+ }
+ return pthread_rwlock_wrlock(&knet_h->global_rwlock);
+}
diff --git a/libknet/threads_common.h b/libknet/threads_common.h
index 348fa05c..0dce25e1 100644
--- a/libknet/threads_common.h
+++ b/libknet/threads_common.h
@@ -1,28 +1,29 @@
/*
* Copyright (C) 2012-2017 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#ifndef __KNET_THREADS_COMMON_H__
#define __KNET_THREADS_COMMON_H__
#include "internals.h"
#define KNET_THREADS_TIMERES 200000
#define timespec_diff(start, end, diff) \
do { \
if (end.tv_sec > start.tv_sec) \
*(diff) = ((end.tv_sec - start.tv_sec) * 1000000000llu) \
+ end.tv_nsec - start.tv_nsec; \
else \
*(diff) = end.tv_nsec - start.tv_nsec; \
} while (0);
int shutdown_in_progress(knet_handle_t knet_h);
+int get_global_wrlock(knet_handle_t knet_h);
#endif
diff --git a/libknet/threads_dsthandler.c b/libknet/threads_dsthandler.c
index 138d64a7..bcc1f987 100644
--- a/libknet/threads_dsthandler.c
+++ b/libknet/threads_dsthandler.c
@@ -1,61 +1,62 @@
/*
* Copyright (C) 2015-2017 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <pthread.h>
#include "host.h"
#include "compat.h"
#include "logging.h"
#include "threads_common.h"
#include "threads_dsthandler.h"
+#include "threads_pmtud.h"
static void _handle_dst_link_updates(knet_handle_t knet_h)
{
knet_node_id_t host_id;
struct knet_host *host;
if (recv(knet_h->dstsockfd[0], &host_id, sizeof(host_id), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(host_id)) {
log_debug(knet_h, KNET_SUB_DSTCACHE, "Short read on dstsockfd");
return;
}
- if (pthread_rwlock_wrlock(&knet_h->global_rwlock) != 0) {
+ if (get_global_wrlock(knet_h) != 0) {
log_debug(knet_h, KNET_SUB_DSTCACHE, "Unable to get read lock");
return;
}
host = knet_h->host_index[host_id];
if (!host) {
log_debug(knet_h, KNET_SUB_DSTCACHE, "Unable to find host: %u", host_id);
goto out_unlock;
}
_host_dstcache_update_sync(knet_h, host);
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
return;
}
void *_handle_dst_link_handler_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
while (!shutdown_in_progress(knet_h)) {
if (epoll_wait(knet_h->dst_link_handler_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1) >= 1)
_handle_dst_link_updates(knet_h);
}
return NULL;
}
diff --git a/libknet/threads_pmtud.c b/libknet/threads_pmtud.c
index 38293567..bae2838a 100644
--- a/libknet/threads_pmtud.c
+++ b/libknet/threads_pmtud.c
@@ -1,453 +1,488 @@
/*
* Copyright (C) 2015-2017 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include "crypto.h"
#include "links.h"
#include "host.h"
#include "logging.h"
#include "transports.h"
#include "threads_common.h"
#include "threads_pmtud.h"
static int _handle_check_link_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link)
{
int err, ret, savederrno, mutex_retry_limit, failsafe;
size_t onwire_len; /* current packet onwire size */
size_t overhead_len; /* onwire packet overhead (protocol based) */
size_t max_mtu_len; /* max mtu for protocol */
size_t data_len; /* how much data we can send in the packet
* generally would be onwire_len - overhead_len
* needs to be adjusted for crypto
*/
size_t pad_len; /* crypto packet pad size, needs to move into crypto.c callbacks */
ssize_t len; /* len of what we were able to sendto onwire */
struct timespec ts;
unsigned long long pong_timeout_adj_tmp;
unsigned char *outbuf = (unsigned char *)knet_h->pmtudbuf;
mutex_retry_limit = 0;
failsafe = 0;
pad_len = 0;
dst_link->last_bad_mtu = 0;
knet_h->pmtudbuf->khp_pmtud_link = dst_link->link_id;
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
max_mtu_len = KNET_PMTUD_SIZE_V6;
overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead;
dst_link->last_good_mtu = dst_link->last_ping_size + overhead_len;
break;
case AF_INET:
max_mtu_len = KNET_PMTUD_SIZE_V4;
overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead;
dst_link->last_good_mtu = dst_link->last_ping_size + overhead_len;
break;
default:
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unknown protocol");
return -1;
break;
}
/*
* discovery starts from the top because kernel will
* refuse to send packets > current iface mtu.
* this saves us some time and network bw.
*/
onwire_len = max_mtu_len;
restart:
/*
* prevent a race when interface mtu is changed _exactly_ during
* the discovery process and it's complex to detect. Easier
* to wait the next loop.
* 30 is not an arbitrary value. To bisect from 576 to 128000 doesn't
* take more than 18/19 steps.
*/
if (failsafe == 30) {
log_err(knet_h, KNET_SUB_PMTUD,
"Aborting PMTUD process: Too many attempts. MTU might have changed during discovery.");
return -1;
} else {
failsafe++;
}
data_len = onwire_len - overhead_len;
if (knet_h->crypto_instance) {
if (knet_h->sec_block_size) {
pad_len = knet_h->sec_block_size - (data_len % knet_h->sec_block_size);
if (pad_len == knet_h->sec_block_size) {
pad_len = 0;
}
data_len = data_len + pad_len;
}
data_len = data_len + (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size);
if (knet_h->sec_block_size) {
while (data_len + overhead_len >= max_mtu_len) {
data_len = data_len - knet_h->sec_block_size;
}
}
if (dst_link->last_bad_mtu) {
while (data_len + overhead_len >= dst_link->last_bad_mtu) {
data_len = data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size);
}
}
if (data_len < (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size) + 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: link mtu smaller than crypto header detected (link might have been disconnected)");
return -1;
}
onwire_len = data_len + overhead_len;
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)knet_h->pmtudbuf,
data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size + knet_h->sec_block_size),
knet_h->pmtudbuf_crypt,
(ssize_t *)&data_len) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to crypto pmtud packet");
return -1;
}
outbuf = knet_h->pmtudbuf_crypt;
knet_h->stats_extra.tx_crypt_pmtu_packets++;
} else {
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
}
/* link has gone down, aborting pmtud */
if (dst_link->status.connected != 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id);
return -1;
}
if (dst_link->transport_connected != 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id);
return -1;
}
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
return -1;
}
+ if (knet_h->pmtud_abort) {
+ pthread_mutex_unlock(&knet_h->pmtud_mutex);
+ errno = EDEADLK;
+ return -1;
+ }
+
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_PMTUD, "Unable to get TX mutex lock: %s", strerror(savederrno));
return -1;
}
retry:
len = sendto(dst_link->outsock, outbuf, data_len,
MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &dst_link->dst_addr,
sizeof(struct sockaddr_storage));
savederrno = errno;
err = transport_tx_sock_error(knet_h, dst_link->transport_type, dst_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet (sendto): %d %s", savederrno, strerror(savederrno));
pthread_mutex_unlock(&knet_h->tx_mutex);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
dst_link->status.stats.tx_pmtu_errors++;
return -1;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
dst_link->status.stats.tx_pmtu_retries++;
goto retry;
break;
}
pthread_mutex_unlock(&knet_h->tx_mutex);
if (len != (ssize_t )data_len) {
if (savederrno == EMSGSIZE) {
dst_link->last_bad_mtu = onwire_len;
} else {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet len: %zu err: %s", onwire_len, strerror(savederrno));
}
} else {
dst_link->last_sent_mtu = onwire_len;
dst_link->last_recv_mtu = 0;
dst_link->status.stats.tx_pmtu_packets++;
dst_link->status.stats.tx_pmtu_bytes += data_len;
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return -1;
}
/*
* set PMTUd reply timeout to match pong_timeout on a given link
*
* math: internally pong_timeout is expressed in microseconds, while
* the public API exports milliseconds. So careful with the 0's here.
* the loop is necessary because we are grabbing the current time just above
* and add values to it that could overflow into seconds.
*/
if (pthread_mutex_lock(&knet_h->backoff_mutex)) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get backoff_mutex");
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return -1;
}
if (knet_h->crypto_instance) {
/*
* crypto, under pressure, is a royal PITA
*/
pong_timeout_adj_tmp = dst_link->pong_timeout_adj * 2;
} else {
pong_timeout_adj_tmp = dst_link->pong_timeout_adj;
}
ts.tv_sec += pong_timeout_adj_tmp / 1000000;
ts.tv_nsec += (((pong_timeout_adj_tmp) % 1000000) * 1000);
while (ts.tv_nsec > 1000000000) {
ts.tv_sec += 1;
ts.tv_nsec -= 1000000000;
}
pthread_mutex_unlock(&knet_h->backoff_mutex);
+ knet_h->pmtud_running = 1;
+
ret = pthread_cond_timedwait(&knet_h->pmtud_cond, &knet_h->pmtud_mutex, &ts);
+ knet_h->pmtud_running = 0;
+
+ if (knet_h->pmtud_abort) {
+ pthread_mutex_unlock(&knet_h->pmtud_mutex);
+ errno = EDEADLK;
+ return -1;
+ }
+
if (shutdown_in_progress(knet_h)) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted. shutdown in progress");
return -1;
}
if ((ret != 0) && (ret != ETIMEDOUT)) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
if (mutex_retry_limit == 3) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unable to get mutex lock");
return -1;
}
mutex_retry_limit++;
goto restart;
}
if ((dst_link->last_recv_mtu != onwire_len) || (ret)) {
dst_link->last_bad_mtu = onwire_len;
} else {
int found_mtu = 0;
if (knet_h->sec_block_size) {
if ((onwire_len + knet_h->sec_block_size >= max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu <= (onwire_len + knet_h->sec_block_size)))) {
found_mtu = 1;
}
} else {
if ((onwire_len == max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu == (onwire_len + 1))) ||
(dst_link->last_bad_mtu == dst_link->last_good_mtu)) {
found_mtu = 1;
}
}
if (found_mtu) {
/*
* account for IP overhead, knet headers and crypto in PMTU calculation
*/
dst_link->status.mtu = onwire_len - dst_link->status.proto_overhead;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return 0;
}
dst_link->last_good_mtu = onwire_len;
}
}
onwire_len = (dst_link->last_good_mtu + dst_link->last_bad_mtu) / 2;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
/*
* give time to the kernel to determine its own version of MTU
*/
usleep((dst_link->status.stats.latency_ave * 1000) / 4);
goto restart;
}
static int _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, unsigned int *min_mtu)
{
uint8_t saved_valid_pmtud;
unsigned int saved_pmtud;
struct timespec clock_now;
unsigned long long diff_pmtud, interval;
interval = knet_h->pmtud_interval * 1000000000llu; /* nanoseconds */
if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get monotonic clock");
return 0;
}
timespec_diff(dst_link->pmtud_last, clock_now, &diff_pmtud);
if (diff_pmtud < interval) {
*min_mtu = dst_link->status.mtu;
return dst_link->has_valid_mtu;
}
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_header_size;
break;
case AF_INET:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_header_size;
break;
}
saved_pmtud = dst_link->status.mtu;
saved_valid_pmtud = dst_link->has_valid_mtu;
log_debug(knet_h, KNET_SUB_PMTUD, "Starting PMTUD for host: %u link: %u", dst_host->host_id, dst_link->link_id);
if (_handle_check_link_pmtud(knet_h, dst_host, dst_link) < 0) {
+ if (errno == EDEADLK) {
+ log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD for host: %u link: %u has been rescheduled", dst_host->host_id, dst_link->link_id);
+ dst_link->status.mtu = saved_pmtud;
+ dst_link->has_valid_mtu = saved_valid_pmtud;
+ errno = EDEADLK;
+ return dst_link->has_valid_mtu;
+ }
dst_link->has_valid_mtu = 0;
} else {
dst_link->has_valid_mtu = 1;
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
if (((dst_link->status.mtu + dst_link->status.proto_overhead) < KNET_PMTUD_MIN_MTU_V6) ||
((dst_link->status.mtu + dst_link->status.proto_overhead) > KNET_PMTUD_SIZE_V6)) {
log_debug(knet_h, KNET_SUB_PMTUD,
"PMTUD detected an IPv6 MTU out of bound value (%u) for host: %u link: %u.",
dst_link->status.mtu + dst_link->status.proto_overhead, dst_host->host_id, dst_link->link_id);
dst_link->has_valid_mtu = 0;
}
break;
case AF_INET:
if (((dst_link->status.mtu + dst_link->status.proto_overhead) < KNET_PMTUD_MIN_MTU_V4) ||
((dst_link->status.mtu + dst_link->status.proto_overhead) > KNET_PMTUD_SIZE_V4)) {
log_debug(knet_h, KNET_SUB_PMTUD,
"PMTUD detected an IPv4 MTU out of bound value (%u) for host: %u link: %u.",
dst_link->status.mtu + dst_link->status.proto_overhead, dst_host->host_id, dst_link->link_id);
dst_link->has_valid_mtu = 0;
}
break;
}
if (dst_link->has_valid_mtu) {
if ((saved_pmtud) && (saved_pmtud != dst_link->status.mtu)) {
log_info(knet_h, KNET_SUB_PMTUD, "PMTUD link change for host: %u link: %u from %u to %u",
dst_host->host_id, dst_link->link_id, saved_pmtud, dst_link->status.mtu);
}
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD completed for host: %u link: %u current link mtu: %u",
dst_host->host_id, dst_link->link_id, dst_link->status.mtu);
if (dst_link->status.mtu < *min_mtu) {
*min_mtu = dst_link->status.mtu;
}
/*
* set pmtud_last, if we can, after we are done with the PMTUd process
* because it can take a very long time.
*/
dst_link->pmtud_last = clock_now;
if (!clock_gettime(CLOCK_MONOTONIC, &clock_now)) {
dst_link->pmtud_last = clock_now;
}
}
}
if (saved_valid_pmtud != dst_link->has_valid_mtu) {
_host_dstcache_update_sync(knet_h, dst_host);
}
return dst_link->has_valid_mtu;
}
void *_handle_pmtud_link_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct knet_host *dst_host;
struct knet_link *dst_link;
int link_idx;
unsigned int min_mtu, have_mtu;
unsigned int lower_mtu;
+ int link_has_mtu;
knet_h->data_mtu = KNET_PMTUD_MIN_MTU_V4 - KNET_HEADER_ALL_SIZE - knet_h->sec_header_size;
/* preparing pmtu buffer */
knet_h->pmtudbuf->kh_version = KNET_HEADER_VERSION;
knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD;
knet_h->pmtudbuf->kh_node = htons(knet_h->host_id);
while (!shutdown_in_progress(knet_h)) {
usleep(KNET_THREADS_TIMERES);
+ if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
+ log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
+ continue;
+ }
+ knet_h->pmtud_abort = 0;
+ pthread_mutex_unlock(&knet_h->pmtud_mutex);
+
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get read lock");
continue;
}
lower_mtu = KNET_PMTUD_SIZE_V4;
min_mtu = KNET_PMTUD_SIZE_V4 - KNET_HEADER_ALL_SIZE - knet_h->sec_header_size;
have_mtu = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
dst_link = &dst_host->link[link_idx];
if ((dst_link->status.enabled != 1) ||
(dst_link->status.connected != 1) ||
(dst_host->link[link_idx].transport_type == KNET_TRANSPORT_LOOPBACK) ||
(!dst_link->last_ping_size) ||
((dst_link->dynamic == KNET_LINK_DYNIP) &&
(dst_link->status.dynconnected != 1)))
continue;
- if (_handle_check_pmtud(knet_h, dst_host, dst_link, &min_mtu)) {
+ link_has_mtu = _handle_check_pmtud(knet_h, dst_host, dst_link, &min_mtu);
+ if (errno == EDEADLK) {
+ goto out_unlock;
+ }
+ if (link_has_mtu) {
have_mtu = 1;
if (min_mtu < lower_mtu) {
lower_mtu = min_mtu;
}
}
}
}
if (have_mtu) {
if (knet_h->data_mtu != lower_mtu) {
knet_h->data_mtu = lower_mtu;
log_info(knet_h, KNET_SUB_PMTUD, "Global data MTU changed to: %u", knet_h->data_mtu);
if (knet_h->pmtud_notify_fn) {
knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data,
knet_h->data_mtu);
}
}
}
-
+out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
return NULL;
}
diff --git a/libknet/transport_sctp.c b/libknet/transport_sctp.c
index 4ebe74a5..ed30e2dd 100644
--- a/libknet/transport_sctp.c
+++ b/libknet/transport_sctp.c
@@ -1,1417 +1,1417 @@
/*
* Copyright (C) 2016-2017 Red Hat, Inc. All rights reserved.
*
* Author: Christine Caulfield <ccaulfie@redhat.com>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdlib.h>
#include "compat.h"
#include "host.h"
#include "links.h"
#include "logging.h"
#include "common.h"
#include "transport_common.h"
#include "threads_common.h"
#ifdef HAVE_NETINET_SCTP_H
#include <netinet/sctp.h>
#include "transport_sctp.h"
typedef struct sctp_handle_info {
struct knet_list_head listen_links_list;
struct knet_list_head connect_links_list;
int connect_epollfd;
int connectsockfd[2];
int listen_epollfd;
int listensockfd[2];
pthread_t connect_thread;
pthread_t listen_thread;
} sctp_handle_info_t;
/*
* use by fd_tracker data type
*/
#define SCTP_NO_LINK_INFO 0
#define SCTP_LISTENER_LINK_INFO 1
#define SCTP_ACCEPTED_LINK_INFO 2
#define SCTP_CONNECT_LINK_INFO 3
/*
* this value is per listener
*/
#define MAX_ACCEPTED_SOCKS 256
typedef struct sctp_listen_link_info {
struct knet_list_head list;
int listen_sock;
int accepted_socks[MAX_ACCEPTED_SOCKS];
struct sockaddr_storage src_address;
int on_listener_epoll;
int on_rx_epoll;
} sctp_listen_link_info_t;
typedef struct sctp_accepted_link_info {
char mread_buf[KNET_DATABUFSIZE];
ssize_t mread_len;
sctp_listen_link_info_t *link_info;
} sctp_accepted_link_info_t ;
typedef struct sctp_connect_link_info {
struct knet_list_head list;
sctp_listen_link_info_t *listener;
struct knet_link *link;
struct sockaddr_storage dst_address;
int connect_sock;
int on_connected_epoll;
int on_rx_epoll;
int close_sock;
} sctp_connect_link_info_t;
/*
* socket handling functions
*
* those functions do NOT perform locking. locking
* should be handled in the right context from callers
*/
/*
* sockets are removed from rx_epoll from callers
* see also error handling functions
*/
static int _close_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info = kn_link->transport_link;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event ev;
if (info->on_connected_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLOUT;
ev.data.fd = info->connect_sock;
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from the epoll pool: %s",
strerror(errno));
goto exit_error;
}
info->on_connected_epoll = 0;
}
exit_error:
if (info->connect_sock != -1) {
if (_set_fd_tracker(knet_h, info->connect_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
close(info->connect_sock);
info->connect_sock = -1;
}
errno = savederrno;
return err;
}
static int _enable_sctp_notifications(knet_handle_t knet_h, int sock, const char *type)
{
int err = 0, savederrno = 0;
struct sctp_event_subscribe events;
memset(&events, 0, sizeof (events));
events.sctp_data_io_event = 1;
events.sctp_association_event = 1;
events.sctp_send_failure_event = 1;
events.sctp_address_event = 1;
events.sctp_peer_error_event = 1;
events.sctp_shutdown_event = 1;
if (setsockopt(sock, IPPROTO_SCTP, SCTP_EVENTS, &events, sizeof (events)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to enable %s events: %s",
type, strerror(savederrno));
}
errno = savederrno;
return err;
}
static int _configure_sctp_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, uint64_t flags, const char *type)
{
int err = 0, savederrno = 0;
int value;
int level;
#ifdef SOL_SCTP
level = SOL_SCTP;
#else
level = IPPROTO_SCTP;
#endif
if (_configure_transport_socket(knet_h, sock, address, flags, type) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
value = 1;
if (setsockopt(sock, level, SCTP_NODELAY, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set sctp nodelay: %s",
strerror(savederrno));
goto exit_error;
}
if (_enable_sctp_notifications(knet_h, sock, type) < 0) {
savederrno = errno;
err = -1;
}
exit_error:
errno = savederrno;
return err;
}
static int _reconnect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info = kn_link->transport_link;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event ev;
if (connect(info->connect_sock, (struct sockaddr *)&kn_link->dst_addr, sockaddr_len(&kn_link->dst_addr)) < 0) {
if ((errno != EALREADY) && (errno != EINPROGRESS) && (errno != EISCONN)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to connect SCTP socket %d: %s",
info->connect_sock, strerror(savederrno));
goto exit_error;
}
}
if (!info->on_connected_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLOUT;
ev.data.fd = info->connect_sock;
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, info->connect_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add send/recv to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_connected_epoll = 1;
}
exit_error:
errno = savederrno;
return err;
}
static int _create_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info = kn_link->transport_link;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event ev;
int connect_sock;
connect_sock = socket(kn_link->dst_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP);
if (connect_sock < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create send/recv socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_configure_sctp_socket(knet_h, connect_sock, &kn_link->dst_addr, kn_link->flags, "SCTP connect") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
if (_set_fd_tracker(knet_h, connect_sock, KNET_TRANSPORT_SCTP, SCTP_CONNECT_LINK_INFO, info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
info->connect_sock = connect_sock;
info->close_sock = 0;
if (_reconnect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
exit_error:
if (err) {
if (info->on_connected_epoll) {
epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, connect_sock, &ev);
}
if (connect_sock >= 0) {
close(connect_sock);
}
}
errno = savederrno;
return err;
}
int sctp_transport_tx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_listen_link_info_t *listen_info;
if (recv_err < 0) {
switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) {
case SCTP_CONNECT_LINK_INFO:
if (connect_info->link->transport_connected == 0) {
return -1;
}
break;
case SCTP_ACCEPTED_LINK_INFO:
listen_info = accepted_info->link_info;
if (listen_info->listen_sock != sockfd) {
if (listen_info->on_rx_epoll == 0) {
return -1;
}
}
break;
}
if (recv_errno == EAGAIN) {
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Sock: %d is overloaded. Slowing TX down", sockfd);
#endif
/* Don't hold onto the lock while sleeping */
pthread_rwlock_unlock(&knet_h->global_rwlock);
usleep(KNET_THREADS_TIMERES / 16);
pthread_rwlock_rdlock(&knet_h->global_rwlock);
return 1;
}
return -1;
}
return 0;
}
/*
* socket error management functions
*
* both called with global read lock.
*
* NOTE: we need to remove the fd from the epoll as soon as possible
* even before we notify the respective thread to take care of it
* because scheduling can make it so that this thread will overload
* and the threads supposed to take care of the error will never
* be able to take action.
* we CANNOT handle FDs here diretly (close/reconnect/etc) due
* to locking context. We need to delegate that to their respective
* management threads within global write lock.
*
* this function is called from:
* - RX thread with recv_err <= 0 directly on recvmmsg error
* - transport_rx_is_data when msg_len == 0 (recv_err = 1)
* - transport_rx_is_data on notification (recv_err = 2)
*
* basically this small abouse of recv_err is to detect notifications
* generated by sockets created by listen().
*/
int sctp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
struct epoll_event ev;
sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_listen_link_info_t *listen_info;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) {
case SCTP_CONNECT_LINK_INFO:
/*
* all connect link have notifications enabled
* and we accept only data from notification and
* generic recvmmsg errors.
*
* Errors generated by msg_len 0 can be ignored because
* they follow a notification (double notification)
*/
if (recv_err != 1) {
connect_info->link->transport_connected = 0;
if (connect_info->on_rx_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = sockfd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
strerror(errno));
return -1;
}
connect_info->on_rx_epoll = 0;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received an error", sockfd);
if (sendto(handle_info->connectsockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno));
}
}
break;
case SCTP_ACCEPTED_LINK_INFO:
listen_info = accepted_info->link_info;
if (listen_info->listen_sock != sockfd) {
if (recv_err != 1) {
if (listen_info->on_rx_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = sockfd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
strerror(errno));
return -1;
}
listen_info->on_rx_epoll = 0;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying listen thread that sockfd %d received an error", sockfd);
if (sendto(handle_info->listensockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify listen thread: %s", strerror(errno));
}
}
} else {
/*
* this means the listen() socket has generated
* a notification. now what? :-)
*/
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen() socket %d", sockfd);
}
break;
default:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received unknown notification? %d", sockfd);
break;
}
/*
* Under RX pressure we need to give time to IPC to pick up the message
*/
/* Don't hold onto the lock while sleeping */
pthread_rwlock_unlock(&knet_h->global_rwlock);
usleep(KNET_THREADS_TIMERES / 2);
pthread_rwlock_rdlock(&knet_h->global_rwlock);
return 0;
}
/*
* NOTE: sctp_transport_rx_is_data is called with global rdlock
* delegate any FD error management to sctp_transport_rx_sock_error
* and keep this code to parsing incoming data only
*/
int sctp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
size_t i;
struct iovec *iov = msg->msg_hdr.msg_iov;
size_t iovlen = msg->msg_hdr.msg_iovlen;
struct sctp_assoc_change *sac;
union sctp_notification *snp;
sctp_accepted_link_info_t *info = knet_h->knet_transport_fd_tracker[sockfd].data;
if (!(msg->msg_hdr.msg_flags & MSG_NOTIFICATION)) {
if (msg->msg_len == 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "received 0 bytes len packet: %d", sockfd);
/*
* NOTE: with event notification enabled, we receive error twice:
* 1) from the event notification
* 2) followed by a 0 byte msg_len
*
* This is generally not a problem if not for causing extra
* handling for the same issue. Should we drop notifications
* and keep the code generic (handle all errors via msg_len = 0)
* or keep the duplication as safety measure, or drop msg_len = 0
* handling (what about sockets without events enabled?)
*/
sctp_transport_rx_sock_error(knet_h, sockfd, 1, 0);
return 1;
}
/*
* missing MSG_EOR has to be treated as a short read
* from the socket and we need to fill in the mread buf
* while we wait for MSG_EOR
*/
if (!(msg->msg_hdr.msg_flags & MSG_EOR)) {
/*
* copy the incoming data into mread_buf + mread_len (incremental)
* and increase mread_len
*/
memmove(info->mread_buf + info->mread_len, iov->iov_base, msg->msg_len);
info->mread_len = info->mread_len + msg->msg_len;
return 0;
}
/*
* got EOR.
* if mread_len is > 0 we are completing a packet from short reads
* complete reassembling the packet in mread_buf, copy it back in the iov
* and set the iov/msg len numbers (size) correctly
*/
if (info->mread_len) {
/*
* add last fragment to mread_buf
*/
memmove(info->mread_buf + info->mread_len, iov->iov_base, msg->msg_len);
info->mread_len = info->mread_len + msg->msg_len;
/*
* move all back into the iovec
*/
memmove(iov->iov_base, info->mread_buf, info->mread_len);
msg->msg_len = info->mread_len;
info->mread_len = 0;
}
return 2;
}
if (!(msg->msg_hdr.msg_flags & MSG_EOR)) {
return 1;
}
for (i=0; i< iovlen; i++) {
snp = iov[i].iov_base;
switch (snp->sn_header.sn_type) {
case SCTP_ASSOC_CHANGE:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change");
sac = &snp->sn_assoc_change;
if (sac->sac_state == SCTP_COMM_LOST) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change: comm_lost");
sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
}
break;
case SCTP_SHUTDOWN_EVENT:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp shutdown event");
sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
break;
case SCTP_SEND_FAILED:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp send failed");
break;
case SCTP_PEER_ADDR_CHANGE:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp peer addr change");
break;
case SCTP_REMOTE_ERROR:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp remote error");
break;
default:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] unknown sctp event type: %hu\n", snp->sn_header.sn_type);
break;
}
}
return 0;
}
/*
* connect / outgoing socket management thread
*/
/*
* _handle_connected_sctp* are called with a global write lock
* from the connect_thread
*/
static void _handle_connected_sctp(knet_handle_t knet_h, int connect_sock)
{
int err;
struct epoll_event ev;
unsigned int status, len = sizeof(status);
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_connect_link_info_t *info = knet_h->knet_transport_fd_tracker[connect_sock].data;
struct knet_link *kn_link = info->link;
err = getsockopt(connect_sock, SOL_SOCKET, SO_ERROR, &status, &len);
if (err) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP getsockopt() on connecting socket %d failed: %s",
connect_sock, strerror(errno));
return;
}
if (info->close_sock) {
if (_close_connect_socket(knet_h, kn_link) < 0) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close sock %d from _handle_connected_sctp: %s", connect_sock, strerror(errno));
return;
}
info->close_sock = 0;
if (_create_connect_socket(knet_h, kn_link) < 0) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to recreate connecting sock! %s", strerror(errno));
return;
}
}
if (status) {
log_info(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect on %d to %s port %s failed: %s",
connect_sock, kn_link->status.dst_ipaddr, kn_link->status.dst_port,
strerror(status));
/*
* No need to create a new socket if connect failed,
* just retry connect
*/
_reconnect_socket(knet_h, info->link);
return;
}
/*
* Connected - Remove us from the connect epoll
*/
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLOUT;
ev.data.fd = connect_sock;
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, connect_sock, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket %d from epoll pool: %s",
connect_sock, strerror(errno));
}
info->on_connected_epoll = 0;
kn_link->transport_connected = 1;
kn_link->outsock = info->connect_sock;
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = connect_sock;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, connect_sock, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connected socket to epoll pool: %s",
strerror(errno));
}
info->on_rx_epoll = 1;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP handler fd %d now connected to %s port %s",
connect_sock,
kn_link->status.dst_ipaddr, kn_link->status.dst_port);
}
static void _handle_connected_sctp_errors(knet_handle_t knet_h)
{
int sockfd = -1;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_connect_link_info_t *info;
if (recv(handle_info->connectsockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on connectsockfd");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for connected socket fd error");
return;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing connected error on socket: %d", sockfd);
info = knet_h->knet_transport_fd_tracker[sockfd].data;
info->close_sock = 1;
info->link->transport_connected = 0;
_reconnect_socket(knet_h, info->link);
}
static void *_sctp_connect_thread(void *data)
{
int savederrno;
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(handle_info->connect_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
if (nev < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect handler EPOLL ERROR: %s",
strerror(errno));
continue;
}
/*
* Sort out which FD has a connection
*/
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s",
strerror(savederrno));
continue;
}
/*
* minor optimization: deduplicate events
*
* in some cases we can receive multiple notifcations
* of the same FD having issues or need handling.
* It's enough to process it once even tho it's safe
* to handle them multiple times.
*/
for (i = 0; i < nev; i++) {
if (events[i].data.fd == handle_info->connectsockfd[0]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for connected socket");
_handle_connected_sctp_errors(knet_h);
} else {
if (_is_valid_fd(knet_h, events[i].data.fd) == 1) {
_handle_connected_sctp(knet_h, events[i].data.fd);
} else {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for dead fd %d\n", events[i].data.fd);
}
}
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
/*
* this thread can generate events for itself.
* we need to sleep in between loops to allow other threads
* to be scheduled
*/
usleep(knet_h->reconnect_int * 1000);
}
return NULL;
}
/*
* listen/incoming connections management thread
*/
/*
* Listener received a new connection
* called with a write lock from main thread
*/
static void _handle_incoming_sctp(knet_handle_t knet_h, int listen_sock)
{
int err = 0, savederrno = 0;
int new_fd;
int i = -1;
sctp_listen_link_info_t *info = knet_h->knet_transport_fd_tracker[listen_sock].data;
struct epoll_event ev;
struct sockaddr_storage ss;
socklen_t sock_len = sizeof(ss);
char addr_str[KNET_MAX_HOST_LEN];
char port_str[KNET_MAX_PORT_LEN];
sctp_accepted_link_info_t *accept_info = NULL;
new_fd = accept(listen_sock, (struct sockaddr *)&ss, &sock_len);
if (new_fd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accept error: %s", strerror(errno));
goto exit_error;
}
if (knet_addrtostr(&ss, sizeof(ss),
addr_str, KNET_MAX_HOST_LEN,
port_str, KNET_MAX_PORT_LEN) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to gather socket info");
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: received connection from: %s port: %s",
addr_str, port_str);
/*
* Keep a track of all accepted FDs
*/
for (i=0; i<MAX_ACCEPTED_SOCKS; i++) {
if (info->accepted_socks[i] == -1) {
info->accepted_socks[i] = new_fd;
break;
}
}
if (i == MAX_ACCEPTED_SOCKS) {
errno = EBUSY;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: too many connections!");
goto exit_error;
}
if (_configure_common_socket(knet_h, new_fd, 0, "SCTP incoming") < 0) { /* Inherit flags from listener? */
savederrno = errno;
err = -1;
goto exit_error;
}
if (_enable_sctp_notifications(knet_h, new_fd, "Incoming connection") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
accept_info = malloc(sizeof(sctp_accepted_link_info_t));
if (!accept_info) {
savederrno = errno;
err = -1;
goto exit_error;
}
memset(accept_info, 0, sizeof(sctp_accepted_link_info_t));
accept_info->link_info = info;
if (_set_fd_tracker(knet_h, new_fd, KNET_TRANSPORT_SCTP, SCTP_ACCEPTED_LINK_INFO, accept_info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(errno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = new_fd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, new_fd, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to add accepted socket %d to epoll pool: %s",
new_fd, strerror(errno));
goto exit_error;
}
info->on_rx_epoll = 1;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accepted new fd %d for %s/%s (listen fd: %d). index: %d",
new_fd, addr_str, port_str, info->listen_sock, i);
exit_error:
if (err) {
if ((i >= 0) || (i < MAX_ACCEPTED_SOCKS)) {
info->accepted_socks[i] = -1;
}
_set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL);
free(accept_info);
close(new_fd);
}
errno = savederrno;
return;
}
/*
* Listen thread received a notification of a bad socket that needs closing
* called with a write lock from main thread
*/
static void _handle_listen_sctp_errors(knet_handle_t knet_h)
{
int sockfd = -1;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_accepted_link_info_t *accept_info;
sctp_listen_link_info_t *info;
struct knet_host *host;
int link_idx;
int i;
if (recv(handle_info->listensockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on listensockfd");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen socket fd error");
return;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing listen error on socket: %d", sockfd);
accept_info = knet_h->knet_transport_fd_tracker[sockfd].data;
info = accept_info->link_info;
/*
* clear all links using this accepted socket as
* outbound dynamically connected socket
*/
for (host = knet_h->host_head; host != NULL; host = host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if ((host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
(host->link[link_idx].outsock == sockfd)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Found dynamic connection on host %d link %d (%d)",
host->host_id, link_idx, sockfd);
host->link[link_idx].status.dynconnected = 0;
host->link[link_idx].transport_connected = 0;
host->link[link_idx].outsock = 0;
memset(&host->link[link_idx].dst_addr, 0, sizeof(struct sockaddr_storage));
}
}
}
for (i=0; i<MAX_ACCEPTED_SOCKS; i++) {
if (sockfd == info->accepted_socks[i]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Closing accepted socket %d", sockfd);
_set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL);
info->accepted_socks[i] = -1;
free(accept_info);
close(sockfd);
}
}
}
static void *_sctp_listen_thread(void *data)
{
int savederrno;
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(handle_info->listen_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
if (nev < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listen handler EPOLL ERROR: %s",
strerror(errno));
continue;
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s",
strerror(savederrno));
continue;
}
/*
* Sort out which FD has an incoming connection
*/
for (i = 0; i < nev; i++) {
if (events[i].data.fd == handle_info->listensockfd[0]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for listener/accepted socket");
_handle_listen_sctp_errors(knet_h);
} else {
if (_is_valid_fd(knet_h, events[i].data.fd) == 1) {
_handle_incoming_sctp(knet_h, events[i].data.fd);
} else {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received listen notification from invalid socket");
}
}
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
return NULL;
}
/*
* sctp_link_listener_start/stop are called in global write lock
* context from set_config and clear_config.
*/
static sctp_listen_link_info_t *sctp_link_listener_start(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int listen_sock = -1;
struct epoll_event ev;
sctp_listen_link_info_t *info = NULL;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
/*
* Only allocate a new listener if src address is different
*/
knet_list_for_each_entry(info, &handle_info->listen_links_list, list) {
if (memcmp(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)) == 0) {
return info;
}
}
info = malloc(sizeof(sctp_listen_link_info_t));
if (!info) {
err = -1;
goto exit_error;
}
memset(info, 0, sizeof(sctp_listen_link_info_t));
memset(info->accepted_socks, -1, sizeof(info->accepted_socks));
memmove(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage));
listen_sock = socket(kn_link->src_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP);
if (listen_sock < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_configure_sctp_socket(knet_h, listen_sock, &kn_link->src_addr, kn_link->flags, "SCTP listener") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
if (bind(listen_sock, (struct sockaddr *)&kn_link->src_addr, sockaddr_len(&kn_link->src_addr)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (listen(listen_sock, 5) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to listen on listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_set_fd_tracker(knet_h, listen_sock, KNET_TRANSPORT_SCTP, SCTP_LISTENER_LINK_INFO, info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, listen_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listener to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_listener_epoll = 1;
info->listen_sock = listen_sock;
knet_list_add(&info->list, &handle_info->listen_links_list);
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Listening on fd %d for %s:%s", listen_sock, kn_link->status.src_ipaddr, kn_link->status.src_port);
exit_error:
if (err) {
if (info->on_listener_epoll) {
epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, listen_sock, &ev);
}
if (listen_sock >= 0) {
close(listen_sock);
}
if (info) {
free(info);
info = NULL;
}
}
errno = savederrno;
return info;
}
static int sctp_link_listener_stop(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int found = 0, i;
struct knet_host *host;
int link_idx;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_connect_link_info_t *this_link_info = kn_link->transport_link;
sctp_listen_link_info_t *info = this_link_info->listener;
sctp_connect_link_info_t *link_info;
struct epoll_event ev;
for (host = knet_h->host_head; host != NULL; host = host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (&host->link[link_idx] == kn_link)
continue;
link_info = host->link[link_idx].transport_link;
if ((link_info) &&
(link_info->listener == info) &&
(host->link[link_idx].status.enabled == 1)) {
found = 1;
break;
}
}
}
if (found) {
this_link_info->listener = NULL;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listener socket %d still in use", info->listen_sock);
savederrno = EBUSY;
err = -1;
goto exit_error;
}
if (info->on_listener_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->listen_sock;
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, info->listen_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_listener_epoll = 0;
}
if (_set_fd_tracker(knet_h, info->listen_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
close(info->listen_sock);
for (i=0; i< MAX_ACCEPTED_SOCKS; i++) {
if (info->accepted_socks[i] > -1) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->accepted_socks[i];
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->accepted_socks[i], &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
strerror(errno));
}
info->on_rx_epoll = 0;
free(knet_h->knet_transport_fd_tracker[info->accepted_socks[i]].data);
close(info->accepted_socks[i]);
if (_set_fd_tracker(knet_h, info->accepted_socks[i], KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
info->accepted_socks[i] = -1;
}
}
knet_list_del(&info->list);
free(info);
this_link_info->listener = NULL;
exit_error:
errno = savederrno;
return err;
}
/*
* Links config/clear. Both called with global wrlock from link_set_config/clear_config
*/
int sctp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int savederrno = 0, err = 0;
sctp_connect_link_info_t *info;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
info = malloc(sizeof(sctp_connect_link_info_t));
if (!info) {
goto exit_error;
}
memset(info, 0, sizeof(sctp_connect_link_info_t));
kn_link->transport_link = info;
info->link = kn_link;
memmove(&info->dst_address, &kn_link->dst_addr, sizeof(struct sockaddr_storage));
info->on_connected_epoll = 0;
info->connect_sock = -1;
info->listener = sctp_link_listener_start(knet_h, kn_link);
if (!info->listener) {
savederrno = errno;
err = -1;
goto exit_error;
}
if (kn_link->dynamic == KNET_LINK_STATIC) {
if (_create_connect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
kn_link->outsock = info->connect_sock;
}
knet_list_add(&info->list, &handle_info->connect_links_list);
exit_error:
if (err) {
if (info) {
if (info->connect_sock) {
close(info->connect_sock);
}
if (info->listener) {
sctp_link_listener_stop(knet_h, kn_link);
}
kn_link->transport_link = NULL;
free(info);
}
}
errno = savederrno;
return err;
}
/*
* called with global wrlock
*/
int sctp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info;
struct epoll_event ev;
if (!kn_link) {
errno = EINVAL;
return -1;
}
info = kn_link->transport_link;
if (!info) {
errno = EINVAL;
return -1;
}
if ((sctp_link_listener_stop(knet_h, kn_link) <0) && (errno != EBUSY)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener trasport: %s",
strerror(savederrno));
goto exit_error;
}
if (info->on_rx_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->connect_sock;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_rx_epoll = 0;
}
if (_close_connect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close connected socket: %s",
strerror(savederrno));
goto exit_error;
}
knet_list_del(&info->list);
free(info);
kn_link->transport_link = NULL;
exit_error:
errno = savederrno;
return err;
}
/*
* transport_free and transport_init are
* called only from knet_handle_new and knet_handle_free.
* all resources (hosts/links) should have been already freed at this point
* and they are called in a write locked context, hence they
* don't need their own locking.
*/
int sctp_transport_free(knet_handle_t knet_h)
{
sctp_handle_info_t *handle_info;
void *thread_status;
struct epoll_event ev;
if (!knet_h->transports[KNET_TRANSPORT_SCTP]) {
errno = EINVAL;
return -1;
}
handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
/*
* keep it here while we debug list usage and such
*/
if (!knet_list_empty(&handle_info->listen_links_list)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. listen links list is not empty");
}
if (!knet_list_empty(&handle_info->connect_links_list)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. connect links list is not empty");
}
if (handle_info->listen_thread) {
pthread_cancel(handle_info->listen_thread);
pthread_join(handle_info->listen_thread, &thread_status);
}
if (handle_info->connect_thread) {
pthread_cancel(handle_info->connect_thread);
pthread_join(handle_info->connect_thread, &thread_status);
}
if (handle_info->listensockfd[0] >= 0) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->listensockfd[0];
epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, handle_info->listensockfd[0], &ev);
}
if (handle_info->connectsockfd[0] >= 0) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->connectsockfd[0];
epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, handle_info->connectsockfd[0], &ev);
}
_close_socketpair(knet_h, handle_info->connectsockfd);
_close_socketpair(knet_h, handle_info->listensockfd);
if (handle_info->listen_epollfd >= 0) {
close(handle_info->listen_epollfd);
}
if (handle_info->connect_epollfd >= 0) {
close(handle_info->connect_epollfd);
}
free(handle_info);
knet_h->transports[KNET_TRANSPORT_SCTP] = NULL;
return 0;
}
int sctp_transport_init(knet_handle_t knet_h)
{
int err = 0, savederrno = 0;
sctp_handle_info_t *handle_info;
struct epoll_event ev;
if (knet_h->transports[KNET_TRANSPORT_SCTP]) {
errno = EEXIST;
return -1;
}
handle_info = malloc(sizeof(sctp_handle_info_t));
if (!handle_info) {
return -1;
}
memset(handle_info, 0,sizeof(sctp_handle_info_t));
knet_h->transports[KNET_TRANSPORT_SCTP] = handle_info;
knet_list_init(&handle_info->listen_links_list);
knet_list_init(&handle_info->connect_links_list);
handle_info->listen_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (handle_info->listen_epollfd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll listen fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(handle_info->listen_epollfd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on listen_epollfd: %s",
strerror(savederrno));
goto exit_fail;
}
handle_info->connect_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (handle_info->connect_epollfd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll connect fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(handle_info->connect_epollfd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on connect_epollfd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, handle_info->connectsockfd) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init connect socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->connectsockfd[0];
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, handle_info->connectsockfd[0], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connectsockfd[0] to connect epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, handle_info->listensockfd) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init listen socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->listensockfd[0];
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, handle_info->listensockfd[0], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listensockfd[0] to listen epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
/*
* Start connect & listener threads
*/
savederrno = pthread_create(&handle_info->listen_thread, 0, _sctp_listen_thread, (void *) knet_h);
if (savederrno) {
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp listen thread: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_create(&handle_info->connect_thread, 0, _sctp_connect_thread, (void *) knet_h);
if (savederrno) {
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp connect thread: %s",
strerror(savederrno));
goto exit_fail;
}
exit_fail:
if (err < 0) {
sctp_transport_free(knet_h);
}
errno = savederrno;
return err;
}
int sctp_transport_link_dyn_connect(knet_handle_t knet_h, int sockfd, struct knet_link *kn_link)
{
kn_link->outsock = sockfd;
kn_link->status.dynconnected = 1;
kn_link->transport_connected = 1;
return 0;
}
#endif
diff --git a/libknet/transports.c b/libknet/transports.c
index 7cc52f2a..62a34b42 100644
--- a/libknet/transports.c
+++ b/libknet/transports.c
@@ -1,267 +1,268 @@
/*
* Copyright (C) 2017 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "libknet.h"
#include "compat.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "common.h"
#include "transports.h"
#include "transport_loopback.h"
#include "transport_udp.h"
#include "transport_sctp.h"
+#include "threads_common.h"
#define empty_module 0, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL },
knet_transport_ops_t transport_modules_cmd[] = {
{ "LOOPBACK", KNET_TRANSPORT_LOOPBACK, 1, KNET_PMTUD_LOOPBACK_OVERHEAD, loopback_transport_init, loopback_transport_free, loopback_transport_link_set_config, loopback_transport_link_clear_config, loopback_transport_link_dyn_connect, loopback_transport_rx_sock_error, loopback_transport_tx_sock_error, loopback_transport_rx_is_data },
{ "UDP", KNET_TRANSPORT_UDP, 1, KNET_PMTUD_UDP_OVERHEAD, udp_transport_init, udp_transport_free, udp_transport_link_set_config, udp_transport_link_clear_config, udp_transport_link_dyn_connect, udp_transport_rx_sock_error, udp_transport_tx_sock_error, udp_transport_rx_is_data },
{ "SCTP", KNET_TRANSPORT_SCTP,
#ifdef HAVE_NETINET_SCTP_H
1, KNET_PMTUD_SCTP_OVERHEAD, sctp_transport_init, sctp_transport_free, sctp_transport_link_set_config, sctp_transport_link_clear_config, sctp_transport_link_dyn_connect, sctp_transport_rx_sock_error, sctp_transport_tx_sock_error, sctp_transport_rx_is_data },
#else
empty_module
#endif
{ NULL, KNET_MAX_TRANSPORTS, empty_module
};
/*
* transport wrappers
*/
int start_all_transports(knet_handle_t knet_h)
{
int idx = 0, savederrno = 0, err = 0;
while (transport_modules_cmd[idx].transport_name != NULL) {
if (transport_modules_cmd[idx].built_in) {
if (transport_modules_cmd[idx].transport_init(knet_h) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE,
"Failed to allocate transport handle for %s: %s",
transport_modules_cmd[idx].transport_name,
strerror(savederrno));
err = -1;
goto out;
}
}
idx++;
}
out:
errno = savederrno;
return err;
}
void stop_all_transports(knet_handle_t knet_h)
{
int idx = 0;
while (transport_modules_cmd[idx].transport_name != NULL) {
if (transport_modules_cmd[idx].built_in) {
transport_modules_cmd[idx].transport_free(knet_h);
}
idx++;
}
}
int transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link, uint8_t transport)
{
if (!transport_modules_cmd[transport].built_in) {
errno = EINVAL;
return -1;
}
kn_link->transport_connected = 0;
kn_link->transport_type = transport;
kn_link->proto_overhead = transport_modules_cmd[transport].transport_mtu_overhead;
return transport_modules_cmd[transport].transport_link_set_config(knet_h, kn_link);
}
int transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
return transport_modules_cmd[kn_link->transport_type].transport_link_clear_config(knet_h, kn_link);
}
int transport_link_dyn_connect(knet_handle_t knet_h, int sockfd, struct knet_link *kn_link)
{
return transport_modules_cmd[kn_link->transport_type].transport_link_dyn_connect(knet_h, sockfd, kn_link);
}
int transport_rx_sock_error(knet_handle_t knet_h, uint8_t transport, int sockfd, int recv_err, int recv_errno)
{
return transport_modules_cmd[transport].transport_rx_sock_error(knet_h, sockfd, recv_err, recv_errno);
}
int transport_tx_sock_error(knet_handle_t knet_h, uint8_t transport, int sockfd, int recv_err, int recv_errno)
{
return transport_modules_cmd[transport].transport_tx_sock_error(knet_h, sockfd, recv_err, recv_errno);
}
int transport_rx_is_data(knet_handle_t knet_h, uint8_t transport, int sockfd, struct knet_mmsghdr *msg)
{
return transport_modules_cmd[transport].transport_rx_is_data(knet_h, sockfd, msg);
}
/*
* public api
*/
int knet_get_transport_list(struct knet_transport_info *transport_list,
size_t *transport_list_entries)
{
int err = 0;
int idx = 0;
int outidx = 0;
if (!transport_list_entries) {
errno = EINVAL;
return -1;
}
while (transport_modules_cmd[idx].transport_name != NULL) {
if (transport_modules_cmd[idx].built_in) {
if (transport_list) {
transport_list[outidx].name = transport_modules_cmd[idx].transport_name;
transport_list[outidx].id = transport_modules_cmd[idx].transport_id;
}
outidx++;
}
idx++;
}
*transport_list_entries = outidx;
return err;
}
const char *knet_get_transport_name_by_id(uint8_t transport)
{
int savederrno = 0;
const char *name = NULL;
if (transport == KNET_MAX_TRANSPORTS) {
errno = EINVAL;
return name;
}
if ((transport_modules_cmd[transport].transport_name) &&
(transport_modules_cmd[transport].built_in)) {
name = transport_modules_cmd[transport].transport_name;
} else {
savederrno = ENOENT;
}
errno = savederrno;
return name;
}
uint8_t knet_get_transport_id_by_name(const char *name)
{
int savederrno = 0;
uint8_t err = KNET_MAX_TRANSPORTS;
int i, found;
if (!name) {
errno = EINVAL;
return err;
}
i = 0;
found = 0;
while (transport_modules_cmd[i].transport_name != NULL) {
if (transport_modules_cmd[i].built_in) {
if (!strcmp(transport_modules_cmd[i].transport_name, name)) {
err = transport_modules_cmd[i].transport_id;
found = 1;
break;
}
}
i++;
}
if (!found) {
savederrno = EINVAL;
}
errno = savederrno;
return err;
}
int knet_handle_set_transport_reconnect_interval(knet_handle_t knet_h, uint32_t msecs)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!msecs) {
errno = EINVAL;
return -1;
}
if (msecs < 1000) {
log_warn(knet_h, KNET_SUB_HANDLE, "reconnect internval below 1 sec (%u msecs) might be too aggressive", msecs);
}
if (msecs > 60000) {
log_warn(knet_h, KNET_SUB_HANDLE, "reconnect internval above 1 minute (%u msecs) could cause long delays in network convergiance", msecs);
}
- savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+ savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->reconnect_int = msecs;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_get_transport_reconnect_interval(knet_handle_t knet_h, uint32_t *msecs)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!msecs) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*msecs = knet_h->reconnect_int;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}

File Metadata

Mime Type
text/x-diff
Expires
Mon, Sep 1, 7:06 PM (1 d, 5 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2280346
Default Alt Text
(174 KB)

Event Timeline