Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/libknet/handle.c b/libknet/handle.c
index e95c6c14..251d3320 100644
--- a/libknet/handle.c
+++ b/libknet/handle.c
@@ -1,1653 +1,1653 @@
/*
* Copyright (C) 2010-2019 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <sys/uio.h>
#include <math.h>
#include <sys/time.h>
#include <sys/resource.h>
#include "internals.h"
#include "crypto.h"
#include "links.h"
#include "compress.h"
#include "compat.h"
#include "common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_pmtud.h"
#include "threads_dsthandler.h"
#include "threads_rx.h"
#include "threads_tx.h"
#include "transports.h"
#include "transport_common.h"
#include "logging.h"
static pthread_mutex_t handle_config_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_rwlock_t shlib_rwlock;
static uint8_t shlib_wrlock_init = 0;
static uint32_t knet_ref = 0;
static int _init_shlib_tracker(knet_handle_t knet_h)
{
int savederrno = 0;
if (!shlib_wrlock_init) {
savederrno = pthread_rwlock_init(&shlib_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize shared lib rwlock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
shlib_wrlock_init = 1;
}
return 0;
}
static void _fini_shlib_tracker(void)
{
if (knet_ref == 0) {
pthread_rwlock_destroy(&shlib_rwlock);
shlib_wrlock_init = 0;
}
return;
}
static int _init_locks(knet_handle_t knet_h)
{
int savederrno = 0;
savederrno = pthread_rwlock_init(&knet_h->global_rwlock, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize list rwlock: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->threads_status_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize threads status mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->pmtud_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->kmtu_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize kernel_mtu mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_cond_init(&knet_h->pmtud_cond, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud conditional mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->hb_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize hb_thread mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->tx_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->backoff_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pong timeout backoff mutex: %s",
strerror(savederrno));
goto exit_fail;
}
savederrno = pthread_mutex_init(&knet_h->tx_seq_num_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_seq_num_mutex mutex: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_locks(knet_handle_t knet_h)
{
pthread_rwlock_destroy(&knet_h->global_rwlock);
pthread_mutex_destroy(&knet_h->pmtud_mutex);
pthread_mutex_destroy(&knet_h->kmtu_mutex);
pthread_cond_destroy(&knet_h->pmtud_cond);
pthread_mutex_destroy(&knet_h->hb_mutex);
pthread_mutex_destroy(&knet_h->tx_mutex);
pthread_mutex_destroy(&knet_h->backoff_mutex);
pthread_mutex_destroy(&knet_h->tx_seq_num_mutex);
pthread_mutex_destroy(&knet_h->threads_status_mutex);
}
static int _init_socks(knet_handle_t knet_h)
{
int savederrno = 0;
if (_init_socketpair(knet_h, knet_h->hostsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, knet_h->dstsockfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_socks(knet_handle_t knet_h)
{
_close_socketpair(knet_h, knet_h->dstsockfd);
_close_socketpair(knet_h, knet_h->hostsockfd);
}
static int _init_buffers(knet_handle_t knet_h)
{
int savederrno = 0;
int i;
size_t bufsize;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE;
knet_h->send_to_links_buf[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf[i], 0, bufsize);
}
for (i = 0; i < PCKT_RX_BUFS; i++) {
knet_h->recv_from_links_buf[i] = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_links_buf[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf[i], 0, KNET_DATABUFSIZE);
}
knet_h->recv_from_sock_buf = malloc(KNET_DATABUFSIZE);
if (!knet_h->recv_from_sock_buf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for app to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_sock_buf, 0, KNET_DATABUFSIZE);
knet_h->pingbuf = malloc(KNET_HEADER_PING_SIZE);
if (!knet_h->pingbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf, 0, KNET_HEADER_PING_SIZE);
knet_h->pmtudbuf = malloc(KNET_PMTUD_SIZE_V6);
if (!knet_h->pmtudbuf) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf, 0, KNET_PMTUD_SIZE_V6);
for (i = 0; i < PCKT_FRAG_MAX; i++) {
bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE + KNET_DATABUFSIZE_CRYPT_PAD;
knet_h->send_to_links_buf_crypt[i] = malloc(bufsize);
if (!knet_h->send_to_links_buf_crypt[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto datafd to link buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf_crypt[i], 0, bufsize);
}
knet_h->recv_from_links_buf_decrypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_decrypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_decrypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->recv_from_links_buf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->recv_from_links_buf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pingbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pingbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto hearbeat buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pingbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->pmtudbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
if (!knet_h->pmtudbuf_crypt) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto pmtud buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
knet_h->recv_from_links_buf_decompress = malloc(KNET_DATABUFSIZE_COMPRESS);
if (!knet_h->recv_from_links_buf_decompress) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for decompress buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->recv_from_links_buf_decompress, 0, KNET_DATABUFSIZE_COMPRESS);
knet_h->send_to_links_buf_compress = malloc(KNET_DATABUFSIZE_COMPRESS);
if (!knet_h->send_to_links_buf_compress) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for compress buffer: %s",
strerror(savederrno));
goto exit_fail;
}
memset(knet_h->send_to_links_buf_compress, 0, KNET_DATABUFSIZE_COMPRESS);
memset(knet_h->knet_transport_fd_tracker, 0, sizeof(knet_h->knet_transport_fd_tracker));
for (i = 0; i < KNET_MAX_FDS; i++) {
knet_h->knet_transport_fd_tracker[i].transport = KNET_MAX_TRANSPORTS;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _destroy_buffers(knet_handle_t knet_h)
{
int i;
for (i = 0; i < PCKT_FRAG_MAX; i++) {
free(knet_h->send_to_links_buf[i]);
free(knet_h->send_to_links_buf_crypt[i]);
}
for (i = 0; i < PCKT_RX_BUFS; i++) {
free(knet_h->recv_from_links_buf[i]);
}
free(knet_h->recv_from_links_buf_decompress);
free(knet_h->send_to_links_buf_compress);
free(knet_h->recv_from_sock_buf);
free(knet_h->recv_from_links_buf_decrypt);
free(knet_h->recv_from_links_buf_crypt);
free(knet_h->pingbuf);
free(knet_h->pingbuf_crypt);
free(knet_h->pmtudbuf);
free(knet_h->pmtudbuf_crypt);
}
static int _init_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int savederrno = 0;
/*
* even if the kernel does dynamic allocation with epoll_ctl
* we need to reserve one extra for host to host communication
*/
knet_h->send_to_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (knet_h->send_to_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll datafd to link fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->recv_from_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->recv_from_links_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll link to datafd fd: %s",
strerror(savederrno));
goto exit_fail;
}
knet_h->dst_link_handler_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
if (knet_h->dst_link_handler_epollfd < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll dst cache fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->send_to_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd to link epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->recv_from_links_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on link to datafd epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(knet_h->dst_link_handler_epollfd)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on dst cache epoll fd: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->hostsockfd[0];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->dstsockfd[0];
if (epoll_ctl(knet_h->dst_link_handler_epollfd,
EPOLL_CTL_ADD, knet_h->dstsockfd[0], &ev)) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add dstsockfd[0] to epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _close_epolls(knet_handle_t knet_h)
{
struct epoll_event ev;
int i;
memset(&ev, 0, sizeof(struct epoll_event));
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if (knet_h->sockfd[i].in_use) {
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created], &ev);
if (knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created]) {
_close_socketpair(knet_h, knet_h->sockfd[i].sockfd);
}
}
}
epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev);
epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev);
close(knet_h->send_to_links_epollfd);
close(knet_h->recv_from_links_epollfd);
close(knet_h->dst_link_handler_epollfd);
}
static int _start_threads(knet_handle_t knet_h)
{
int savederrno = 0;
set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, 0,
_handle_pmtud_link_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start pmtud link thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_DST_LINK, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->dst_link_handler_thread, 0,
_handle_dst_link_handler_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start dst cache thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->send_to_links_thread, 0,
_handle_send_to_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start datafd to link thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->recv_from_links_thread, 0,
_handle_recv_from_links_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start link to datafd thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&knet_h->heartbt_thread, 0,
_handle_heartbt_thread, (void *) knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to start heartbeat thread: %s",
strerror(savederrno));
goto exit_fail;
}
return 0;
exit_fail:
errno = savederrno;
return -1;
}
static void _stop_threads(knet_handle_t knet_h)
{
void *retval;
wait_all_threads_status(knet_h, KNET_THREAD_STOPPED);
if (knet_h->heartbt_thread) {
pthread_cancel(knet_h->heartbt_thread);
pthread_join(knet_h->heartbt_thread, &retval);
}
if (knet_h->send_to_links_thread) {
pthread_cancel(knet_h->send_to_links_thread);
pthread_join(knet_h->send_to_links_thread, &retval);
}
if (knet_h->recv_from_links_thread) {
pthread_cancel(knet_h->recv_from_links_thread);
pthread_join(knet_h->recv_from_links_thread, &retval);
}
if (knet_h->dst_link_handler_thread) {
pthread_cancel(knet_h->dst_link_handler_thread);
pthread_join(knet_h->dst_link_handler_thread, &retval);
}
if (knet_h->pmtud_link_handler_thread) {
pthread_cancel(knet_h->pmtud_link_handler_thread);
pthread_join(knet_h->pmtud_link_handler_thread, &retval);
}
}
knet_handle_t knet_handle_new_ex(knet_node_id_t host_id,
int log_fd,
uint8_t default_log_level,
uint64_t flags)
{
knet_handle_t knet_h;
int savederrno = 0;
struct rlimit cur;
if (getrlimit(RLIMIT_NOFILE, &cur) < 0) {
return NULL;
}
if ((log_fd < 0) || ((unsigned int)log_fd >= cur.rlim_max)) {
errno = EINVAL;
return NULL;
}
/*
* validate incoming request
*/
if ((log_fd) && (default_log_level > KNET_LOG_DEBUG)) {
errno = EINVAL;
return NULL;
}
if (flags > KNET_HANDLE_FLAG_PRIVILEGED * 2 - 1) {
errno = EINVAL;
return NULL;
}
/*
* allocate handle
*/
knet_h = malloc(sizeof(struct knet_handle));
if (!knet_h) {
errno = ENOMEM;
return NULL;
}
memset(knet_h, 0, sizeof(struct knet_handle));
/*
* setting up some handle data so that we can use logging
* also when initializing the library global locks
* and trackers
*/
knet_h->flags = flags;
/*
* copy config in place
*/
knet_h->host_id = host_id;
knet_h->logfd = log_fd;
if (knet_h->logfd > 0) {
memset(&knet_h->log_levels, default_log_level, KNET_MAX_SUBSYSTEMS);
}
/*
* set pmtud default timers
*/
knet_h->pmtud_interval = KNET_PMTUD_DEFAULT_INTERVAL;
/*
* set transports reconnect default timers
*/
knet_h->reconnect_int = KNET_TRANSPORT_DEFAULT_RECONNECT_INTERVAL;
/*
* Set 'min' stats to the maximum value so the
* first value we get is always less
*/
knet_h->stats.tx_compress_time_min = UINT64_MAX;
knet_h->stats.rx_compress_time_min = UINT64_MAX;
knet_h->stats.tx_crypt_time_min = UINT64_MAX;
knet_h->stats.rx_crypt_time_min = UINT64_MAX;
/*
* init global shlib tracker
*/
savederrno = pthread_mutex_lock(&handle_config_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s",
strerror(savederrno));
free(knet_h);
knet_h = NULL;
errno = savederrno;
return NULL;
}
knet_ref++;
if (_init_shlib_tracker(knet_h) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to init handles traceker: %s",
strerror(savederrno));
errno = savederrno;
goto exit_fail;
}
pthread_mutex_unlock(&handle_config_mutex);
/*
* init main locking structures
*/
if (_init_locks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* init sockets
*/
if (_init_socks(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* allocate packet buffers
*/
if (_init_buffers(knet_h)) {
savederrno = errno;
goto exit_fail;
}
if (compress_init(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* create epoll fds
*/
if (_init_epolls(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* start transports
*/
if (start_all_transports(knet_h)) {
savederrno = errno;
goto exit_fail;
}
/*
* start internal threads
*/
if (_start_threads(knet_h)) {
savederrno = errno;
goto exit_fail;
}
wait_all_threads_status(knet_h, KNET_THREAD_STARTED);
errno = 0;
return knet_h;
exit_fail:
knet_handle_free(knet_h);
errno = savederrno;
return NULL;
}
knet_handle_t knet_handle_new(knet_node_id_t host_id,
int log_fd,
uint8_t default_log_level)
{
return knet_handle_new_ex(host_id, log_fd, default_log_level, KNET_HANDLE_FLAG_PRIVILEGED);
}
int knet_handle_free(knet_handle_t knet_h)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (knet_h->host_head != NULL) {
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_HANDLE,
"Unable to free handle: host(s) or listener(s) are still active: %s",
strerror(savederrno));
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return -1;
}
knet_h->fini_in_progress = 1;
pthread_rwlock_unlock(&knet_h->global_rwlock);
_stop_threads(knet_h);
stop_all_transports(knet_h);
_close_epolls(knet_h);
_destroy_buffers(knet_h);
_close_socks(knet_h);
crypto_fini(knet_h);
compress_fini(knet_h, 1);
_destroy_locks(knet_h);
free(knet_h);
knet_h = NULL;
(void)pthread_mutex_lock(&handle_config_mutex);
knet_ref--;
_fini_shlib_tracker();
pthread_mutex_unlock(&handle_config_mutex);
errno = 0;
return 0;
}
int knet_handle_enable_sock_notify(knet_handle_t knet_h,
void *sock_notify_fn_private_data,
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!sock_notify_fn) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data;
knet_h->sock_notify_fn = sock_notify_fn;
log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled");
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel)
{
int err = 0, savederrno = 0;
int i;
struct epoll_event ev;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd == NULL) {
errno = EINVAL;
return -1;
}
if (channel == NULL) {
errno = EINVAL;
return -1;
}
if (*channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sock_notify_fn) {
log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (*datafd > 0) {
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) {
log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i);
savederrno = EEXIST;
err = -1;
goto out_unlock;
}
}
}
/*
* auto allocate a channel
*/
if (*channel < 0) {
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if (!knet_h->sockfd[i].in_use) {
*channel = i;
break;
}
}
if (*channel < 0) {
savederrno = EBUSY;
err = -1;
goto out_unlock;
}
} else {
if (knet_h->sockfd[*channel].in_use) {
savederrno = EBUSY;
err = -1;
goto out_unlock;
}
}
knet_h->sockfd[*channel].is_created = 0;
knet_h->sockfd[*channel].is_socket = 0;
knet_h->sockfd[*channel].has_error = 0;
if (*datafd > 0) {
int sockopt;
socklen_t sockoptlen = sizeof(sockopt);
if (_fdset_cloexec(*datafd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s",
strerror(savederrno));
goto out_unlock;
}
if (_fdset_nonblock(*datafd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s",
strerror(savederrno));
goto out_unlock;
}
knet_h->sockfd[*channel].sockfd[0] = *datafd;
knet_h->sockfd[*channel].sockfd[1] = 0;
if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) {
knet_h->sockfd[*channel].is_socket = 1;
}
} else {
if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) {
savederrno = errno;
err = -1;
goto out_unlock;
}
knet_h->sockfd[*channel].is_created = 1;
knet_h->sockfd[*channel].is_socket = 1;
*datafd = knet_h->sockfd[*channel].sockfd[0];
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created];
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s",
knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno));
if (knet_h->sockfd[*channel].is_created) {
_close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd);
}
goto out_unlock;
}
knet_h->sockfd[*channel].in_use = 1;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd)
{
int err = 0, savederrno = 0;
int8_t channel = -1;
int i;
struct epoll_event ev;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd <= 0) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) &&
(knet_h->sockfd[i].sockfd[0] == datafd)) {
channel = i;
break;
}
}
if (channel < 0) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if (!knet_h->sockfd[channel].has_error) {
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
goto out_unlock;
}
}
if (knet_h->sockfd[channel].is_created) {
_close_socketpair(knet_h, knet_h->sockfd[channel].sockfd);
}
memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock));
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd)
{
int err = 0, savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) {
errno = EINVAL;
return -1;
}
if (datafd == NULL) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
*datafd = knet_h->sockfd[channel].sockfd[0];
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel)
{
int err = 0, savederrno = 0;
int i;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (datafd <= 0) {
errno = EINVAL;
return -1;
}
if (channel == NULL) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*channel = -1;
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) &&
(knet_h->sockfd[i].sockfd[0] == datafd)) {
*channel = i;
break;
}
}
if (*channel < 0) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_handle_enable_filter(knet_handle_t knet_h,
void *dst_host_filter_fn_private_data,
int (*dst_host_filter_fn) (
void *private_data,
const unsigned char *outdata,
ssize_t outdata_len,
uint8_t tx_rx,
knet_node_id_t this_host_id,
knet_node_id_t src_node_id,
int8_t *channel,
knet_node_id_t *dst_host_ids,
size_t *dst_host_ids_entries))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data;
knet_h->dst_host_filter_fn = dst_host_filter_fn;
if (knet_h->dst_host_filter_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (enabled > 1) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->enabled = enabled;
if (enabled) {
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_enable_access_lists(knet_handle_t knet_h, unsigned int enabled)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (enabled > 1) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->use_access_lists = enabled;
if (enabled) {
log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*interval = knet_h->pmtud_interval;
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if ((!interval) || (interval > 86400)) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_interval = interval;
log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval);
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_enable_pmtud_notify(knet_handle_t knet_h,
void *pmtud_notify_fn_private_data,
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu))
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data;
knet_h->pmtud_notify_fn = pmtud_notify_fn;
if (knet_h->pmtud_notify_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled");
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_pmtud_get(knet_handle_t knet_h,
unsigned int *data_mtu)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!data_mtu) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
*data_mtu = knet_h->data_mtu;
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = 0;
return 0;
}
int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!knet_handle_crypto_cfg) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) ||
((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) &&
(!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) {
crypto_fini(knet_h);
log_debug(knet_h, KNET_SUB_CRYPTO, "crypto is not enabled");
err = 0;
goto exit_unlock;
}
if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) {
log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short (min %d): %u",
KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) {
log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long (max %d): %u",
KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
err = crypto_init(knet_h, knet_handle_crypto_cfg);
if (err) {
err = -2;
savederrno = errno;
}
exit_unlock:
if (!err) {
- force_pmtud_run(knet_h, KNET_SUB_CRYPTO);
+ force_pmtud_run(knet_h, KNET_SUB_CRYPTO, 1);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_handle_compress(knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg)
{
int savederrno = 0;
int err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!knet_handle_compress_cfg) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
compress_fini(knet_h, 0);
err = compress_cfg(knet_h, knet_handle_compress_cfg);
savederrno = errno;
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_in;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)buff;
iov_in.iov_len = buff_len;
err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_out[1];
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(iov_out, 0, sizeof(iov_out));
iov_out[0].iov_base = (void *)buff;
iov_out[0].iov_len = buff_len;
err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_handle_get_stats(knet_handle_t knet_h, struct knet_handle_stats *stats, size_t struct_size)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!stats) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (struct_size > sizeof(struct knet_handle_stats)) {
struct_size = sizeof(struct knet_handle_stats);
}
memmove(stats, &knet_h->stats, struct_size);
/*
* TX crypt stats only count the data packets sent, so add in the ping/pong/pmtud figures
* RX is OK as it counts them before they are sorted.
*/
stats->tx_crypt_packets += knet_h->stats_extra.tx_crypt_ping_packets +
knet_h->stats_extra.tx_crypt_pong_packets +
knet_h->stats_extra.tx_crypt_pmtu_packets +
knet_h->stats_extra.tx_crypt_pmtu_reply_packets;
/* Tell the caller our full size in case they have an old version */
stats->size = sizeof(struct knet_handle_stats);
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
int knet_handle_clear_stats(knet_handle_t knet_h, int clear_option)
{
int savederrno = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (clear_option != KNET_CLEARSTATS_HANDLE_ONLY &&
clear_option != KNET_CLEARSTATS_HANDLE_AND_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
memset(&knet_h->stats, 0, sizeof(struct knet_handle_stats));
memset(&knet_h->stats_extra, 0, sizeof(struct knet_handle_stats_extra));
if (clear_option == KNET_CLEARSTATS_HANDLE_AND_LINK) {
_link_clear_stats(knet_h);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
return 0;
}
diff --git a/libknet/threads_common.c b/libknet/threads_common.c
index 61ffd826..53a6f9fb 100644
--- a/libknet/threads_common.c
+++ b/libknet/threads_common.c
@@ -1,178 +1,187 @@
/*
* Copyright (C) 2016-2019 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <pthread.h>
#include <errno.h>
#include <string.h>
#include "internals.h"
#include "logging.h"
#include "threads_common.h"
int shutdown_in_progress(knet_handle_t knet_h)
{
int savederrno = 0;
int ret;
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMMON, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
ret = knet_h->fini_in_progress;
pthread_rwlock_unlock(&knet_h->global_rwlock);
return ret;
}
static int pmtud_reschedule(knet_handle_t knet_h)
{
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
return -1;
}
if (knet_h->pmtud_running) {
knet_h->pmtud_abort = 1;
if (knet_h->pmtud_waiting) {
pthread_cond_signal(&knet_h->pmtud_cond);
}
}
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return 0;
}
int get_global_wrlock(knet_handle_t knet_h)
{
if (pmtud_reschedule(knet_h) < 0) {
log_info(knet_h, KNET_SUB_PMTUD, "Unable to notify PMTUd to reschedule. Expect delays in executing API calls");
}
return pthread_rwlock_wrlock(&knet_h->global_rwlock);
}
static struct pretty_names thread_names[KNET_THREAD_MAX] =
{
{ "TX", KNET_THREAD_TX },
{ "RX", KNET_THREAD_RX },
{ "HB", KNET_THREAD_HB },
{ "PMTUD", KNET_THREAD_PMTUD },
#ifdef HAVE_NETINET_SCTP_H
{ "SCTP_LISTEN", KNET_THREAD_SCTP_LISTEN },
{ "SCTP_CONN", KNET_THREAD_SCTP_CONN },
#endif
{ "DST_LINK", KNET_THREAD_DST_LINK }
};
static struct pretty_names thread_status[] =
{
{ "unregistered", KNET_THREAD_UNREGISTERED },
{ "registered", KNET_THREAD_REGISTERED },
{ "started", KNET_THREAD_STARTED },
{ "stopped", KNET_THREAD_STOPPED }
};
static const char *get_thread_status_name(uint8_t status)
{
unsigned int i;
for (i = 0; i < KNET_THREAD_STATUS_MAX; i++) {
if (thread_status[i].val == status) {
return thread_status[i].name;
}
}
return "unknown";
}
static const char *get_thread_name(uint8_t thread_id)
{
unsigned int i;
for (i = 0; i < KNET_THREAD_MAX; i++) {
if (thread_names[i].val == thread_id) {
return thread_names[i].name;
}
}
return "unknown";
}
int set_thread_status(knet_handle_t knet_h, uint8_t thread_id, uint8_t status)
{
if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
log_debug(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock");
return -1;
}
knet_h->threads_status[thread_id] = status;
log_debug(knet_h, KNET_SUB_HANDLE, "Updated status for thread %s to %s",
get_thread_name(thread_id), get_thread_status_name(status));
pthread_mutex_unlock(&knet_h->threads_status_mutex);
return 0;
}
int wait_all_threads_status(knet_handle_t knet_h, uint8_t status)
{
uint8_t i = 0, found = 0;
while (!found) {
usleep(KNET_THREADS_TIMERES);
if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
continue;
}
found = 1;
for (i = 0; i < KNET_THREAD_MAX; i++) {
if (knet_h->threads_status[i] == KNET_THREAD_UNREGISTERED) {
continue;
}
log_debug(knet_h, KNET_SUB_HANDLE, "Checking thread: %s status: %s req: %s",
get_thread_name(i),
get_thread_status_name(knet_h->threads_status[i]),
get_thread_status_name(status));
if (knet_h->threads_status[i] != status) {
found = 0;
}
}
pthread_mutex_unlock(&knet_h->threads_status_mutex);
}
return 0;
}
-void force_pmtud_run(knet_handle_t knet_h, uint8_t subsystem)
+void force_pmtud_run(knet_handle_t knet_h, uint8_t subsystem, uint8_t reset_mtu)
{
+ if (reset_mtu) {
+ log_debug(knet_h, subsystem, "PMTUd has been reset to default");
+ knet_h->data_mtu = KNET_PMTUD_MIN_MTU_V4 - KNET_HEADER_ALL_SIZE - knet_h->sec_header_size;
+ if (knet_h->pmtud_notify_fn) {
+ knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data,
+ knet_h->data_mtu);
+ }
+ }
+
/*
* we can only try to take a lock here. This part of the code
* can be invoked by any thread, including PMTUd that is already
* holding a lock at that stage.
* If PMTUd is holding the lock, most likely it is already running
* and we don't need to notify it back.
*/
if (!pthread_mutex_trylock(&knet_h->pmtud_mutex)) {
if (!knet_h->pmtud_running) {
if (!knet_h->pmtud_forcerun) {
log_debug(knet_h, subsystem, "Notifying PMTUd to rerun");
knet_h->pmtud_forcerun = 1;
}
}
pthread_mutex_unlock(&knet_h->pmtud_mutex);
}
}
diff --git a/libknet/threads_common.h b/libknet/threads_common.h
index 19336ce7..cff76917 100644
--- a/libknet/threads_common.h
+++ b/libknet/threads_common.h
@@ -1,50 +1,50 @@
/*
* Copyright (C) 2012-2019 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#ifndef __KNET_THREADS_COMMON_H__
#define __KNET_THREADS_COMMON_H__
#include "internals.h"
#define KNET_THREADS_TIMERES 200000
#define KNET_THREAD_UNREGISTERED 0 /* thread does not exist */
#define KNET_THREAD_REGISTERED 1 /* thread has been registered before pthread_create invocation.
make sure threads are registered before calling wait_all_thread_status */
#define KNET_THREAD_STARTED 2 /* thread has reported to be running */
#define KNET_THREAD_STOPPED 3 /* thread has returned */
#define KNET_THREAD_STATUS_MAX KNET_THREAD_STOPPED + 1
#define KNET_THREAD_TX 0
#define KNET_THREAD_RX 1
#define KNET_THREAD_HB 2
#define KNET_THREAD_PMTUD 3
#define KNET_THREAD_DST_LINK 4
#ifdef HAVE_NETINET_SCTP_H
#define KNET_THREAD_SCTP_LISTEN 5
#define KNET_THREAD_SCTP_CONN 6
#endif
#define KNET_THREAD_MAX 32
#define timespec_diff(start, end, diff) \
do { \
if (end.tv_sec > start.tv_sec) \
*(diff) = ((end.tv_sec - start.tv_sec) * 1000000000llu) \
+ end.tv_nsec - start.tv_nsec; \
else \
*(diff) = end.tv_nsec - start.tv_nsec; \
} while (0);
int shutdown_in_progress(knet_handle_t knet_h);
int get_global_wrlock(knet_handle_t knet_h);
int set_thread_status(knet_handle_t knet_h, uint8_t thread_id, uint8_t status);
int wait_all_threads_status(knet_handle_t knet_h, uint8_t status);
-void force_pmtud_run(knet_handle_t knet_h, uint8_t subsystem);
+void force_pmtud_run(knet_handle_t knet_h, uint8_t subsystem, uint8_t reset_mtu);
#endif
diff --git a/libknet/transport_udp.c b/libknet/transport_udp.c
index 232dbcb0..15374389 100644
--- a/libknet/transport_udp.c
+++ b/libknet/transport_udp.c
@@ -1,431 +1,431 @@
/*
* Copyright (C) 2016-2019 Red Hat, Inc. All rights reserved.
*
* Author: Christine Caulfield <ccaulfie@redhat.com>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/ip_icmp.h>
#if defined (IP_RECVERR) || defined (IPV6_RECVERR)
#include <linux/errqueue.h>
#endif
#include "libknet.h"
#include "compat.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "common.h"
#include "transport_common.h"
#include "transport_udp.h"
#include "threads_common.h"
typedef struct udp_handle_info {
struct knet_list_head links_list;
} udp_handle_info_t;
typedef struct udp_link_info {
struct knet_list_head list;
struct sockaddr_storage local_address;
int socket_fd;
int on_epoll;
} udp_link_info_t;
int udp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int sock = -1;
struct epoll_event ev;
udp_link_info_t *info;
udp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_UDP];
#if defined (IP_RECVERR) || defined (IPV6_RECVERR)
int value;
#endif
/*
* Only allocate a new link if the local address is different
*/
knet_list_for_each_entry(info, &handle_info->links_list, list) {
if (memcmp(&info->local_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)) == 0) {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Re-using existing UDP socket for new link");
kn_link->outsock = info->socket_fd;
kn_link->transport_link = info;
kn_link->transport_connected = 1;
return 0;
}
}
info = malloc(sizeof(udp_link_info_t));
if (!info) {
err = -1;
goto exit_error;
}
memset(info, 0, sizeof(udp_link_info_t));
sock = socket(kn_link->src_addr.ss_family, SOCK_DGRAM, 0);
if (sock < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to create listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_configure_transport_socket(knet_h, sock, &kn_link->src_addr, kn_link->flags, "UDP") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
#ifdef IP_RECVERR
if (kn_link->src_addr.ss_family == AF_INET) {
value = 1;
if (setsockopt(sock, SOL_IP, IP_RECVERR, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set RECVERR on socket: %s",
strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "IP_RECVERR enabled on socket: %i", sock);
}
#else
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "IP_RECVERR not available in this build/platform");
#endif
#ifdef IPV6_RECVERR
if (kn_link->src_addr.ss_family == AF_INET6) {
value = 1;
if (setsockopt(sock, SOL_IPV6, IPV6_RECVERR, &value, sizeof(value)) <0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set RECVERR on socket: %s",
strerror(savederrno));
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "IPV6_RECVERR enabled on socket: %i", sock);
}
#else
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "IPV6_RECVERR not available in this build/platform");
#endif
if (bind(sock, (struct sockaddr *)&kn_link->src_addr, sockaddr_len(&kn_link->src_addr))) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to bind listener socket: %s",
strerror(savederrno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = sock;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to add listener to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_epoll = 1;
if (_set_fd_tracker(knet_h, sock, KNET_TRANSPORT_UDP, 0, info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
memmove(&info->local_address, &kn_link->src_addr, sizeof(struct sockaddr_storage));
info->socket_fd = sock;
knet_list_add(&info->list, &handle_info->links_list);
kn_link->outsock = sock;
kn_link->transport_link = info;
kn_link->transport_connected = 1;
exit_error:
if (err) {
if (info) {
if (info->on_epoll) {
epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sock, &ev);
}
free(info);
}
if (sock >= 0) {
close(sock);
}
}
errno = savederrno;
return err;
}
int udp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int found = 0;
struct knet_host *host;
int link_idx;
udp_link_info_t *info = kn_link->transport_link;
struct epoll_event ev;
for (host = knet_h->host_head; host != NULL; host = host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (&host->link[link_idx] == kn_link)
continue;
if (host->link[link_idx].transport_link == info) {
found = 1;
break;
}
}
}
if (found) {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "UDP socket %d still in use", info->socket_fd);
savederrno = EBUSY;
err = -1;
goto exit_error;
}
if (info->on_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->socket_fd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->socket_fd, &ev) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to remove UDP socket from epoll poll: %s",
strerror(errno));
goto exit_error;
}
info->on_epoll = 0;
}
if (_set_fd_tracker(knet_h, info->socket_fd, KNET_MAX_TRANSPORTS, 0, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
close(info->socket_fd);
knet_list_del(&info->list);
free(kn_link->transport_link);
exit_error:
errno = savederrno;
return err;
}
int udp_transport_free(knet_handle_t knet_h)
{
udp_handle_info_t *handle_info;
if (!knet_h->transports[KNET_TRANSPORT_UDP]) {
errno = EINVAL;
return -1;
}
handle_info = knet_h->transports[KNET_TRANSPORT_UDP];
/*
* keep it here while we debug list usage and such
*/
if (!knet_list_empty(&handle_info->links_list)) {
log_err(knet_h, KNET_SUB_TRANSP_UDP, "Internal error. handle list is not empty");
return -1;
}
free(handle_info);
knet_h->transports[KNET_TRANSPORT_UDP] = NULL;
return 0;
}
int udp_transport_init(knet_handle_t knet_h)
{
udp_handle_info_t *handle_info;
if (knet_h->transports[KNET_TRANSPORT_UDP]) {
errno = EEXIST;
return -1;
}
handle_info = malloc(sizeof(udp_handle_info_t));
if (!handle_info) {
return -1;
}
memset(handle_info, 0, sizeof(udp_handle_info_t));
knet_h->transports[KNET_TRANSPORT_UDP] = handle_info;
knet_list_init(&handle_info->links_list);
return 0;
}
#if defined (IP_RECVERR) || defined (IPV6_RECVERR)
static int read_errs_from_sock(knet_handle_t knet_h, int sockfd)
{
int err = 0, savederrno = 0;
int got_err = 0;
char buffer[1024];
struct iovec iov;
struct msghdr msg;
struct cmsghdr *cmsg;
struct sock_extended_err *sock_err;
struct icmphdr icmph;
struct sockaddr_storage remote;
struct sockaddr_storage *origin;
char addr_str[KNET_MAX_HOST_LEN];
char port_str[KNET_MAX_PORT_LEN];
char addr_remote_str[KNET_MAX_HOST_LEN];
char port_remote_str[KNET_MAX_PORT_LEN];
iov.iov_base = &icmph;
iov.iov_len = sizeof(icmph);
msg.msg_name = (void*)&remote;
msg.msg_namelen = sizeof(remote);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_flags = 0;
msg.msg_control = buffer;
msg.msg_controllen = sizeof(buffer);
for (;;) {
err = recvmsg(sockfd, &msg, MSG_ERRQUEUE);
savederrno = errno;
if (err < 0) {
if (!got_err) {
errno = savederrno;
return -1;
} else {
return 0;
}
}
got_err = 1;
for (cmsg = CMSG_FIRSTHDR(&msg);cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (((cmsg->cmsg_level == SOL_IP) && (cmsg->cmsg_type == IP_RECVERR)) ||
((cmsg->cmsg_level == SOL_IPV6 && (cmsg->cmsg_type == IPV6_RECVERR)))) {
sock_err = (struct sock_extended_err*)(void *)CMSG_DATA(cmsg);
if (sock_err) {
switch (sock_err->ee_origin) {
case SO_EE_ORIGIN_NONE: /* no origin */
case SO_EE_ORIGIN_LOCAL: /* local source (EMSGSIZE) */
if (sock_err->ee_errno == EMSGSIZE) {
if (pthread_mutex_lock(&knet_h->kmtu_mutex) != 0) {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Unable to get mutex lock");
knet_h->kernel_mtu = 0;
break;
} else {
knet_h->kernel_mtu = sock_err->ee_info;
pthread_mutex_unlock(&knet_h->kmtu_mutex);
}
- force_pmtud_run(knet_h, KNET_SUB_TRANSP_UDP);
+ force_pmtud_run(knet_h, KNET_SUB_TRANSP_UDP, 0);
}
/*
* those errors are way too noisy
*/
break;
case SO_EE_ORIGIN_ICMP: /* ICMP */
case SO_EE_ORIGIN_ICMP6: /* ICMP6 */
origin = (struct sockaddr_storage *)(void *)SO_EE_OFFENDER(sock_err);
if (knet_addrtostr(origin, sizeof(*origin),
addr_str, KNET_MAX_HOST_LEN,
port_str, KNET_MAX_PORT_LEN) < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Received ICMP error from unknown source: %s", strerror(sock_err->ee_errno));
} else {
if (knet_addrtostr(&remote, sizeof(remote),
addr_remote_str, KNET_MAX_HOST_LEN,
port_remote_str, KNET_MAX_PORT_LEN) < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Received ICMP error from %s: %s destination unknown", addr_str, strerror(sock_err->ee_errno));
} else {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Received ICMP error from %s: %s %s", addr_str, strerror(sock_err->ee_errno), addr_remote_str);
}
}
break;
}
} else {
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "No data in MSG_ERRQUEUE");
}
}
}
}
}
#else
static int read_errs_from_sock(knet_handle_t knet_h, int sockfd)
{
return 0;
}
#endif
int udp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
if (recv_errno == EAGAIN) {
read_errs_from_sock(knet_h, sockfd);
}
return 0;
}
int udp_transport_tx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
if (recv_err < 0) {
if (recv_errno == EMSGSIZE) {
read_errs_from_sock(knet_h, sockfd);
return 0;
}
if (recv_errno == EINVAL || recv_errno == EPERM) {
return -1;
}
if ((recv_errno == ENOBUFS) || (recv_errno == EAGAIN)) {
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Sock: %d is overloaded. Slowing TX down", sockfd);
#endif
usleep(KNET_THREADS_TIMERES / 16);
} else {
read_errs_from_sock(knet_h, sockfd);
}
return 1;
}
return 0;
}
int udp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
if (msg->msg_len == 0)
return 0;
return 2;
}
int udp_transport_link_dyn_connect(knet_handle_t knet_h, int sockfd, struct knet_link *kn_link)
{
kn_link->status.dynconnected = 1;
return 0;
}
int udp_transport_link_get_acl_fd(knet_handle_t knet_h, struct knet_link *kn_link)
{
return kn_link->outsock;
}

File Metadata

Mime Type
text/x-diff
Expires
Mon, Feb 24, 4:38 PM (42 m, 39 s ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464346
Default Alt Text
(57 KB)

Event Timeline