Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3152960
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
108 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
index c2dcbd6e..91f797e7 100644
--- a/libknet/threads_rx.c
+++ b/libknet/threads_rx.c
@@ -1,1056 +1,1057 @@
/*
* Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/uio.h>
#include <pthread.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "links.h"
#include "links_acl.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_rx.h"
#include "netutils.h"
/*
* RECV
*/
/*
* return 1 if a > b
* return -1 if b > a
* return 0 if they are equal
*/
static inline int timecmp(struct timespec a, struct timespec b)
{
if (a.tv_sec != b.tv_sec) {
if (a.tv_sec > b.tv_sec) {
return 1;
} else {
return -1;
}
} else {
if (a.tv_nsec > b.tv_nsec) {
return 1;
} else if (a.tv_nsec < b.tv_nsec) {
return -1;
} else {
return 0;
}
}
}
/*
* this functions needs to return an index (0 to 7)
* to a knet_host_defrag_buf. (-1 on errors)
*/
static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
{
struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
int i, oldest;
/*
* check if there is a buffer already in use handling the same seq_num
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (src_host->defrag_buf[i].in_use) {
if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
return i;
}
}
}
/*
* If there is no buffer that's handling the current seq_num
* either it's new or it's been reclaimed already.
* check if it's been reclaimed/seen before using the defrag circular
* buffer. If the pckt has been seen before, the buffer expired (ETIME)
* and there is no point to try to defrag it again.
*/
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
errno = ETIME;
return -1;
}
/*
* register the pckt as seen
*/
_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
/*
* see if there is a free buffer
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (!src_host->defrag_buf[i].in_use) {
return i;
}
}
/*
* at this point, there are no free buffers, the pckt is new
* and we need to reclaim a buffer, and we will take the one
* with the oldest timestamp. It's as good as any.
*/
oldest = 0;
for (i = 0; i < KNET_MAX_LINK; i++) {
if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
oldest = i;
}
}
src_host->defrag_buf[oldest].in_use = 0;
return oldest;
}
static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
{
struct knet_host_defrag_buf *defrag_buf;
int defrag_buf_idx;
defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
if (defrag_buf_idx < 0) {
return 1;
}
defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
/*
* if the buf is not is use, then make sure it's clean
*/
if (!defrag_buf->in_use) {
memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
defrag_buf->in_use = 1;
defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
}
/*
* update timestamp on the buffer
*/
clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
/*
* check if we already received this fragment
*/
if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
/*
* if we have received this fragment and we didn't clear the buffer
* it means that we don't have all fragments yet
*/
return 1;
}
/*
* we need to handle the last packet with gloves due to its different size
*/
if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
defrag_buf->last_frag_size = *len;
/*
* in the event when the last packet arrives first,
* we still don't know the offset vs the other fragments (based on MTU),
* so we store the fragment at the end of the buffer where it's safe
* and take a copy of the len so that we can restore its offset later.
* remember we can't use the local MTU for this calculation because pMTU
* can be asymettric between the same hosts.
*/
if (!defrag_buf->frag_size) {
defrag_buf->last_first = 1;
memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
inbuf->khp_data_userdata,
*len);
}
} else {
defrag_buf->frag_size = *len;
}
if (defrag_buf->frag_size) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
inbuf->khp_data_userdata, *len);
}
defrag_buf->frag_recv++;
defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
/*
* check if we received all the fragments
*/
if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
/*
* special case the last pckt
*/
if (defrag_buf->last_first) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
defrag_buf->last_frag_size);
}
/*
* recalculate packet lenght
*/
*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
/*
* copy the pckt back in the user data
*/
memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
/*
* free this buffer
*/
defrag_buf->in_use = 0;
return 0;
}
return 1;
}
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
{
int err = 0, savederrno = 0, stats_err = 0;
ssize_t outlen;
struct knet_host *src_host;
struct knet_link *src_link;
unsigned long long latency_last;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
uint64_t decrypt_time = 0;
struct timespec recvtime;
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[1];
int8_t channel;
struct sockaddr_storage pckt_src;
seq_num_t recv_seq_num;
int wipe_bufs = 0;
int try_decrypt = 0, decrypted = 0, i;
for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
if (knet_h->crypto_instance[i]) {
try_decrypt = 1;
break;
}
}
if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) {
log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!");
return;
}
if (try_decrypt) {
struct timespec start_time;
struct timespec end_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_authenticate_and_decrypt(knet_h,
(unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_decrypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) {
return;
}
log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data");
} else {
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &decrypt_time);
len = outlen;
inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
decrypted = 1;
}
}
if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
return;
}
if (inbuf->kh_version != KNET_HEADER_VERSION) {
log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
return;
}
inbuf->kh_node = ntohs(inbuf->kh_node);
src_host = knet_h->host_index[inbuf->kh_node];
if (src_host == NULL) { /* host not found */
log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
return;
}
src_link = src_host->link +
(inbuf->khp_ping_link % KNET_MAX_LINK);
if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
if (src_link->dynamic == KNET_LINK_DYNIP) {
/*
* cpyaddrport will only copy address and port of the incoming
* packet and strip extra bits such as flow and scopeid
*/
cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
&pckt_src, sockaddr_len(&pckt_src)) != 0) {
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
src_host->host_id, src_link->link_id);
memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
} else {
log_info(knet_h, KNET_SUB_RX,
"host: %u link: %u new connection established from: %s %s",
src_host->host_id, src_link->link_id,
src_link->status.dst_ipaddr, src_link->status.dst_port);
}
}
/*
* transport has already accepted the connection here
* otherwise we would not be receiving packets
*/
transport_link_dyn_connect(knet_h, sockfd, src_link);
}
}
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
if (stats_err) {
log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s",
src_host->host_id, src_link->link_id, strerror(savederrno));
return;
}
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_HOST_INFO:
case KNET_HEADER_TYPE_DATA:
if (!src_host->status.reachable) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id);
return;
}
inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
channel = inbuf->khp_data_channel;
src_host->got_data = 1;
src_link->status.stats.rx_data_packets++;
src_link->status.stats.rx_data_bytes += len;
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
}
return;
}
if (inbuf->khp_data_frag_num > 1) {
/*
* len as received from the socket also includes extra stuff
* that the defrag code doesn't care about. So strip it
* here and readd only for repadding once we are done
* defragging
*/
len = len - KNET_HEADER_DATA_SIZE;
if (pckt_defrag(knet_h, inbuf, &len)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
len = len + KNET_HEADER_DATA_SIZE;
}
if (inbuf->khp_data_compress) {
ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = decompress(knet_h, inbuf->khp_data_compress,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
knet_h->recv_from_links_buf_decompress,
&decmp_outlen);
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (!err) {
/* Collect stats */
if (compress_time < knet_h->stats.rx_compress_time_min) {
knet_h->stats.rx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.rx_compress_time_max) {
knet_h->stats.rx_compress_time_max = compress_time;
}
knet_h->stats.rx_compress_time_ave =
(knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets +
compress_time) / (knet_h->stats.rx_compressed_packets+1);
knet_h->stats.rx_compressed_packets++;
knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE;
memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
len = decmp_outlen + KNET_HEADER_DATA_SIZE;
} else {
knet_h->stats.rx_failed_to_decompress++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
err, strerror(errno));
return;
}
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (decrypted) {
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
/* Only update the crypto overhead for data packets. Mainly to be
consistent with TX */
if (decrypt_time < knet_h->stats.rx_crypt_time_min) {
knet_h->stats.rx_crypt_time_min = decrypt_time;
}
if (decrypt_time > knet_h->stats.rx_crypt_time_max) {
knet_h->stats.rx_crypt_time_max = decrypt_time;
}
knet_h->stats.rx_crypt_time_ave =
(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
decrypt_time) / (knet_h->stats.rx_crypt_packets+1);
knet_h->stats.rx_crypt_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
if (knet_h->enabled != 1) /* data forward is disabled */
break;
if (knet_h->dst_host_filter_fn) {
size_t host_idx;
int found = 0;
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
&channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
return;
}
if ((!bcast) && (!dst_host_ids_entries)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return;
}
/* check if we are dst for this packet */
if (!bcast) {
if (dst_host_ids_entries > KNET_MAX_HOST) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
return;
}
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
if (!found) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return;
}
}
}
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (!knet_h->sockfd[channel].in_use) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
outlen = 0;
memset(iov_out, 0, sizeof(iov_out));
retry:
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
log_debug(knet_h, KNET_SUB_RX,
"Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
iov_out[0].iov_len, outlen);
goto retry;
}
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
if ((size_t)outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
}
} else { /* HOSTINFO */
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
}
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
switch(knet_hostinfo->khi_type) {
case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
break;
case KNET_HOSTINFO_TYPE_LINK_TABLE:
break;
default:
log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
break;
}
}
break;
case KNET_HEADER_TYPE_PING:
outlen = KNET_HEADER_PING_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PONG;
inbuf->kh_node = htons(knet_h->host_id);
recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
src_link->status.stats.rx_ping_packets++;
src_link->status.stats.rx_ping_bytes += len;
wipe_bufs = 0;
if (!inbuf->khp_ping_timed) {
/*
* we might be receiving this message from all links, but we want
* to process it only the first time
*/
if (recv_seq_num != src_host->untimed_rx_seq_num) {
/*
* cache the untimed seq num
*/
src_host->untimed_rx_seq_num = recv_seq_num;
/*
* if the host has received data in between
* untimed ping, then we don't need to wipe the bufs
*/
if (src_host->got_data) {
src_host->got_data = 0;
wipe_bufs = 0;
} else {
wipe_bufs = 1;
}
}
_seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
} else {
/*
* pings always arrives in bursts over all the link
* catch the first of them to cache the seq num and
* avoid duplicate processing
*/
if (recv_seq_num != src_host->timed_rx_seq_num) {
src_host->timed_rx_seq_num = recv_seq_num;
if (recv_seq_num == 0) {
_seq_num_lookup(src_host, recv_seq_num, 0, 1);
}
}
}
if (knet_h->crypto_in_use_config) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
outlen,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
knet_h->stats_extra.tx_crypt_pong_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
retry_pong:
if (src_link->transport_connected) {
if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pong_errors++;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pong_retries++;
goto retry_pong;
break;
}
}
src_link->status.stats.tx_pong_packets++;
src_link->status.stats.tx_pong_bytes += outlen;
}
break;
case KNET_HEADER_TYPE_PONG:
src_link->status.stats.rx_pong_packets++;
src_link->status.stats.rx_pong_bytes += len;
clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
timespec_diff(recvtime,
src_link->status.pong_last, &latency_last);
if ((latency_last / 1000llu) > src_link->pong_timeout) {
log_debug(knet_h, KNET_SUB_RX,
"Incoming pong packet from host: %u link: %u has higher latency than pong_timeout. Discarding",
src_host->host_id, src_link->link_id);
} else {
/*
* in words : ('previous mean' * '(count -1)') + 'new value') / 'count'
*/
src_link->status.stats.latency_samples++;
/*
* limit to max_samples (precision)
*/
if (src_link->status.stats.latency_samples >= src_link->latency_max_samples) {
src_link->status.stats.latency_samples = src_link->latency_max_samples;
}
src_link->status.stats.latency_ave =
(((src_link->status.stats.latency_ave * (src_link->status.stats.latency_samples - 1)) + (latency_last / 1000llu)) / src_link->status.stats.latency_samples);
if (src_link->status.stats.latency_ave < src_link->pong_timeout_adj) {
if (!src_link->status.connected) {
if (src_link->received_pong >= src_link->pong_count) {
log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
src_host->host_id, src_link->link_id);
_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0);
} else {
src_link->received_pong++;
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
src_host->host_id, src_link->link_id, src_link->received_pong);
}
}
}
/* Calculate latency stats */
if (src_link->status.stats.latency_ave > src_link->status.stats.latency_max) {
src_link->status.stats.latency_max = src_link->status.stats.latency_ave;
}
if (src_link->status.stats.latency_ave < src_link->status.stats.latency_min) {
src_link->status.stats.latency_min = src_link->status.stats.latency_ave;
}
}
break;
case KNET_HEADER_TYPE_PMTUD:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
outlen = KNET_HEADER_PMTUD_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
inbuf->kh_node = htons(knet_h->host_id);
if (knet_h->crypto_in_use_config) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
outlen,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
knet_h->stats_extra.tx_crypt_pmtu_reply_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
/* Unlock so we don't deadlock with tx_mutex */
pthread_mutex_unlock(&src_link->link_stats_mutex);
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
goto out_pmtud;
}
retry_pmtud:
if (src_link->transport_connected) {
if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pmtu_errors++;
break;
case 0: /* ignore error and continue */
src_link->status.stats.tx_pmtu_errors++;
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pmtu_retries++;
pthread_mutex_unlock(&src_link->link_stats_mutex);
goto retry_pmtud;
break;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
}
pthread_mutex_unlock(&knet_h->tx_mutex);
out_pmtud:
return; /* Don't need to unlock link_stats_mutex */
case KNET_HEADER_TYPE_PMTUD_REPLY:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
/* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
pthread_mutex_unlock(&src_link->link_stats_mutex);
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
break;
}
src_link->last_recv_mtu = inbuf->khp_pmtud_size;
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return;
default:
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int err, savederrno;
int i, msg_recv, transport;
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
/*
* this is normal if a fd got an event and before we grab the read lock
* and the link is removed by another thread
*/
goto exit_unlock;
}
transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
/*
* reset msg_namelen to buffer size because after recvmmsg
* each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
*/
for (i = 0; i < PCKT_RX_BUFS; i++) {
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
}
msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
/*
* WARNING: man page for recvmmsg is wrong. Kernel implementation here:
* recvmmsg can return:
* -1 on error
* 0 if the previous run of recvmmsg recorded an error on the socket
* N number of messages (see exception below).
*
* If there is an error from recvmsg after receiving a frame or more, the recvmmsg
* loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
* it will be visibile in the next run.
*
* Need to be careful how we handle errors at this stage.
*
* error messages need to be handled on a per transport/protocol base
* at this point we have different layers of error handling
* - msg_recv < 0 -> error from this run
* msg_recv = 0 -> error from previous run and error on socket needs to be cleared
* - per-transport message data
* example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
* but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
* - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
* errno set. That means the error api needs to be able to abort the loop below.
*/
if (msg_recv <= 0) {
transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno);
goto exit_unlock;
}
for (i = 0; i < msg_recv; i++) {
err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]);
/*
* TODO: make this section silent once we are confident
* all protocols packet handlers are good
*/
switch(err) {
case KNET_TRANSPORT_RX_ERROR: /* on error */
log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
break;
case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */
/*
* processing incoming packets vs access lists
*/
if ((knet_h->use_access_lists) &&
(transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) {
if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) {
char src_ipaddr[KNET_MAX_HOST_LEN];
char src_port[KNET_MAX_PORT_LEN];
memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
memset(src_port, 0, KNET_MAX_PORT_LEN);
if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name),
src_ipaddr, KNET_MAX_HOST_LEN,
src_port, KNET_MAX_PORT_LEN) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port");
} else {
log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port);
}
/*
* continue processing the other packets
*/
continue;
}
}
_parse_recv_from_links(knet_h, sockfd, &msg[i]);
break;
case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE:
log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue");
break;
case KNET_TRANSPORT_RX_OOB_DATA_STOP:
log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop");
goto exit_unlock;
break;
}
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_RX_BUFS];
struct knet_mmsghdr msg[PCKT_RX_BUFS];
struct iovec iov_in[PCKT_RX_BUFS];
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED);
memset(&msg, 0, sizeof(msg));
+ memset(&events, 0, sizeof(events));
for (i = 0; i < PCKT_RX_BUFS; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
iov_in[i].iov_len = KNET_DATABUFSIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000);
/*
* the RX threads only need to notify that there has been at least
* one successful run after queue flush has been requested.
* See setfwd in handle.c
*/
if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
}
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
}
}
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED);
return NULL;
}
ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_in;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)buff;
iov_in.iov_len = buff_len;
err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c
index 3c25e5b8..3e6fb745 100644
--- a/libknet/threads_tx.c
+++ b/libknet/threads_tx.c
@@ -1,904 +1,905 @@
/*
* Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <math.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/uio.h>
#include <errno.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_tx.h"
#include "netutils.h"
/*
* SEND
*/
static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
{
int link_idx, msg_idx, sent_msgs, prev_sent, progress;
int err = 0, savederrno = 0, locked = 0;
unsigned int i;
struct knet_mmsghdr *cur;
struct knet_link *cur_link;
for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
prev_sent = 0;
progress = 1;
locked = 0;
cur_link = &dst_host->link[dst_host->active_links[link_idx]];
if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) {
continue;
}
savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s",
dst_host->host_id, cur_link->link_id, strerror(savederrno));
continue;
}
locked = 1;
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr;
/* Cast for Linux/BSD compatibility */
for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) {
cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len;
}
cur_link->status.stats.tx_data_packets++;
msg_idx++;
}
retry:
cur = &msg[prev_sent];
sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport),
&cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
switch(err) {
case -1: /* unrecoverable error */
cur_link->status.stats.tx_data_errors++;
goto out_unlock;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
cur_link->status.stats.tx_data_retries++;
goto retry;
break;
}
prev_sent = prev_sent + sent_msgs;
if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) {
if ((sent_msgs) || (progress)) {
if (sent_msgs) {
progress = 1;
} else {
progress = 0;
}
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
sent_msgs, msg_idx,
dst_host->name, dst_host->host_id,
dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
dst_host->link[dst_host->active_links[link_idx]].link_id);
#endif
goto retry;
}
if (!progress) {
savederrno = EAGAIN;
err = -1;
goto out_unlock;
}
}
if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
(dst_host->active_link_entries > 1)) {
uint8_t cur_link_id = dst_host->active_links[0];
memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
break;
}
pthread_mutex_unlock(&cur_link->link_stats_mutex);
locked = 0;
}
out_unlock:
if (locked) {
pthread_mutex_unlock(&cur_link->link_stats_mutex);
}
errno = savederrno;
return err;
}
static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync)
{
size_t outlen, frag_len;
struct knet_host *dst_host;
knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST];
size_t dst_host_ids_entries_temp = 0;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[PCKT_FRAG_MAX][2];
int iovcnt_out = 2;
uint8_t frag_idx;
unsigned int temp_data_mtu;
size_t host_idx;
int send_mcast = 0;
struct knet_header *inbuf;
int savederrno = 0;
int err = 0;
seq_num_t tx_seq_num;
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
int msgs_to_send, msg_idx;
unsigned int i;
int j;
int send_local = 0;
int data_compressed = 0;
size_t uncrypted_frag_size;
int stats_locked = 0, stats_err = 0;
inbuf = knet_h->recv_from_sock_buf;
if ((knet_h->enabled != 1) &&
(inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
savederrno = ECANCELED;
err = -1;
goto out_unlock;
}
/*
* move this into a separate function to expand on
* extra switching rules
*/
switch(inbuf->kh_type) {
case KNET_HEADER_TYPE_DATA:
if (knet_h->dst_host_filter_fn) {
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
inlen,
KNET_NOTIFY_TX,
knet_h->host_id,
knet_h->host_id,
&channel,
dst_host_ids_temp,
&dst_host_ids_entries_temp);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
savederrno = EFAULT;
err = -1;
goto out_unlock;
}
if ((!bcast) && (!dst_host_ids_entries_temp)) {
log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
if ((!bcast) &&
(dst_host_ids_entries_temp > KNET_MAX_HOST)) {
log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
}
/* Send to localhost if appropriate and enabled */
if (knet_h->has_loop_link) {
send_local = 0;
if (bcast) {
send_local = 1;
} else {
for (i=0; i< dst_host_ids_entries_temp; i++) {
if (dst_host_ids_temp[i] == knet_h->host_id) {
send_local = 1;
}
}
}
if (send_local) {
const unsigned char *buf = inbuf->khp_data_userdata;
ssize_t buflen = inlen;
struct knet_link *local_link;
local_link = knet_h->host_index[knet_h->host_id]->link;
local_retry:
err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen);
if (err < 0) {
log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno));
local_link->status.stats.tx_data_errors++;
}
if (err > 0 && err < buflen) {
log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen);
local_link->status.stats.tx_data_retries++;
buf += err;
buflen -= err;
goto local_retry;
}
if (err == buflen) {
local_link->status.stats.tx_data_packets++;
local_link->status.stats.tx_data_bytes += inlen;
}
}
}
break;
case KNET_HEADER_TYPE_HOST_INFO:
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
bcast = 0;
dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
dst_host_ids_entries_temp = 1;
knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
}
break;
default:
log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
savederrno = ENOMSG;
err = -1;
goto out_unlock;
break;
}
if (is_sync) {
if ((bcast) ||
((!bcast) && (dst_host_ids_entries_temp > 1))) {
log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
savederrno = E2BIG;
err = -1;
goto out_unlock;
}
}
/*
* check destinations hosts before spending time
* in fragmenting/encrypting packets to save
* time processing data for unreachable hosts.
* for unicast, also remap the destination data
* to skip unreachable hosts.
*/
if (!bcast) {
dst_host_ids_entries = 0;
for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
if (!dst_host) {
continue;
}
if (!(dst_host->host_id == knet_h->host_id &&
knet_h->has_loop_link) &&
dst_host->status.reachable) {
dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
dst_host_ids_entries++;
}
}
if (!dst_host_ids_entries) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
} else {
send_mcast = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (!(dst_host->host_id == knet_h->host_id &&
knet_h->has_loop_link) &&
dst_host->status.reachable) {
send_mcast = 1;
break;
}
}
if (!send_mcast) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
}
if (!knet_h->data_mtu) {
/*
* using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
*/
log_debug(knet_h, KNET_SUB_TX,
"Received data packet but data MTU is still unknown."
" Packet might not be delivered."
" Assuming minimum IPv4 MTU (%d)",
KNET_PMTUD_MIN_MTU_V4);
temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
} else {
/*
* take a copy of the mtu to avoid value changing under
* our feet while we are sending a fragmented pckt
*/
temp_data_mtu = knet_h->data_mtu;
}
/*
* compress data
*/
if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) {
size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = compress(knet_h,
(const unsigned char *)inbuf->khp_data_userdata, inlen,
knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen);
savederrno = errno;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
err = -1;
savederrno = stats_err;
goto out_unlock;
}
stats_locked = 1;
/* Collect stats */
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (compress_time < knet_h->stats.tx_compress_time_min) {
knet_h->stats.tx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.tx_compress_time_max) {
knet_h->stats.tx_compress_time_max = compress_time;
}
knet_h->stats.tx_compress_time_ave =
(unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets +
compress_time) / (knet_h->stats.tx_compressed_packets+1);
if (err < 0) {
knet_h->stats.tx_failed_to_compress++;
log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno));
} else {
knet_h->stats.tx_compressed_packets++;
knet_h->stats.tx_compressed_original_bytes += inlen;
knet_h->stats.tx_compressed_size_bytes += cmp_outlen;
if (cmp_outlen < inlen) {
memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen);
inlen = cmp_outlen;
data_compressed = 1;
} else {
knet_h->stats.tx_unable_to_compress++;
}
}
}
if (!stats_locked) {
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
err = -1;
savederrno = stats_err;
goto out_unlock;
}
}
if (knet_h->compress_model > 0 && !data_compressed) {
knet_h->stats.tx_uncompressed_packets++;
}
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
stats_locked = 0;
/*
* prepare the outgoing buffers
*/
frag_len = inlen;
frag_idx = 0;
inbuf->khp_data_bcast = bcast;
inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
inbuf->khp_data_channel = channel;
if (data_compressed) {
inbuf->khp_data_compress = knet_h->compress_model;
} else {
inbuf->khp_data_compress = 0;
}
if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
goto out_unlock;
}
knet_h->tx_seq_num++;
/*
* force seq_num 0 to detect a node that has crashed and rejoining
* the knet instance. seq_num 0 will clear the buffers in the RX
* thread
*/
if (knet_h->tx_seq_num == 0) {
knet_h->tx_seq_num++;
}
/*
* cache the value in locked context
*/
tx_seq_num = knet_h->tx_seq_num;
inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num);
pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
/*
* forcefully broadcast a ping to all nodes every SEQ_MAX / 8
* pckts.
* this solves 2 problems:
* 1) on TX socket overloads we generate extra pings to keep links alive
* 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
* node 3+ will be able to keep in sync on the TX seq_num even without
* receiving traffic or pings in betweens. This avoids issues with
* rollover of the circular buffer
*/
if (tx_seq_num % (SEQ_MAX / 8) == 0) {
_send_pings(knet_h, 0);
}
if (inbuf->khp_data_frag_num > 1) {
while (frag_idx < inbuf->khp_data_frag_num) {
/*
* set the iov_base
*/
iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE;
iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx);
/*
* set the len
*/
if (frag_len > temp_data_mtu) {
iov_out[frag_idx][1].iov_len = temp_data_mtu;
} else {
iov_out[frag_idx][1].iov_len = frag_len;
}
/*
* copy the frag info on all buffers
*/
knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress;
frag_len = frag_len - temp_data_mtu;
frag_idx++;
}
iovcnt_out = 2;
} else {
iov_out[frag_idx][0].iov_base = (void *)inbuf;
iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
iovcnt_out = 1;
}
if (knet_h->crypto_in_use_config) {
struct timespec start_time;
struct timespec end_time;
uint64_t crypt_time;
frag_idx = 0;
while (frag_idx < inbuf->khp_data_frag_num) {
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_encrypt_and_signv(
knet_h,
iov_out[frag_idx], iovcnt_out,
knet_h->send_to_links_buf_crypt[frag_idx],
(ssize_t *)&outlen) < 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet");
savederrno = ECHILD;
err = -1;
goto out_unlock;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &crypt_time);
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
err = -1;
savederrno = stats_err;
goto out_unlock;
}
if (crypt_time < knet_h->stats.tx_crypt_time_min) {
knet_h->stats.tx_crypt_time_min = crypt_time;
}
if (crypt_time > knet_h->stats.tx_crypt_time_max) {
knet_h->stats.tx_crypt_time_max = crypt_time;
}
knet_h->stats.tx_crypt_time_ave =
(knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets +
crypt_time) / (knet_h->stats.tx_crypt_packets+1);
uncrypted_frag_size = 0;
for (j=0; j < iovcnt_out; j++) {
uncrypted_frag_size += iov_out[frag_idx][j].iov_len;
}
knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size);
knet_h->stats.tx_crypt_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
iov_out[frag_idx][0].iov_len = outlen;
frag_idx++;
}
iovcnt_out = 1;
}
memset(&msg, 0, sizeof(msg));
msgs_to_send = inbuf->khp_data_frag_num;
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0];
msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out;
msg_idx++;
}
if (!bcast) {
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids[host_idx]];
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
} else {
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (dst_host->status.reachable) {
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
}
}
out_unlock:
errno = savederrno;
return err;
}
static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type)
{
ssize_t inlen = 0;
int savederrno = 0, docallback = 0;
if ((channel >= 0) &&
(channel < KNET_DATAFD_MAX) &&
(!knet_h->sockfd[channel].is_socket)) {
inlen = readv(sockfd, msg->msg_iov, 1);
} else {
inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL);
if (msg->msg_flags & MSG_TRUNC) {
log_warn(knet_h, KNET_SUB_TX, "Received truncated message from sock %d. Discarding", sockfd);
return;
}
}
if (inlen == 0) {
savederrno = 0;
docallback = 1;
} else if (inlen < 0) {
struct epoll_event ev;
savederrno = errno;
docallback = 1;
memset(&ev, 0, sizeof(struct epoll_event));
if (channel != KNET_INTERNAL_DATA_CHANNEL) {
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
} else {
knet_h->sockfd[channel].has_error = 1;
}
}
/*
* TODO: add error handling for KNET_INTERNAL_DATA_CHANNEL
* once we add support for internal knet communication
*/
} else {
knet_h->recv_from_sock_buf->kh_type = type;
_parse_recv_from_sock(knet_h, inlen, channel, 0);
}
if ((docallback) && (channel != KNET_INTERNAL_DATA_CHANNEL)) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_TX,
inlen,
savederrno);
}
}
void *_handle_send_to_links_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
int i, nev, type;
int flush, flush_queue_limit;
int8_t channel;
struct iovec iov_in;
struct msghdr msg;
struct sockaddr_storage address;
set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED);
+ memset(&events, 0, sizeof(events));
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata;
iov_in.iov_len = KNET_MAX_PACKET_SIZE;
memset(&msg, 0, sizeof(struct msghdr));
msg.msg_name = &address;
msg.msg_namelen = sizeof(struct sockaddr_storage);
msg.msg_iov = &iov_in;
msg.msg_iovlen = 1;
knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION;
knet_h->recv_from_sock_buf->khp_data_frag_seq = 0;
knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id);
for (i = 0; i < PCKT_FRAG_MAX; i++) {
knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
}
flush_queue_limit = 0;
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, knet_h->threads_timer_res / 1000);
flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX);
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
/*
* ideally we want to communicate that we are done flushing
* the queue when we have an epoll timeout event
*/
if (flush == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
flush_queue_limit = 0;
}
continue;
}
/*
* fall back in case the TX sockets will continue receive traffic
* and we do not hit an epoll timeout.
*
* allow up to a 100 loops to flush queues, then we give up.
* there might be more clean ways to do it by checking the buffer queue
* on each socket, but we have tons of sockets and calculations can go wrong.
* Also, why would you disable data forwarding and still send packets?
*/
if (flush == KNET_THREAD_QUEUE_FLUSH) {
if (flush_queue_limit >= 100) {
log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss");
set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
flush_queue_limit = 0;
} else {
flush_queue_limit++;
}
} else {
flush_queue_limit = 0;
}
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
continue;
}
for (i = 0; i < nev; i++) {
if (events[i].data.fd == knet_h->hostsockfd[0]) {
type = KNET_HEADER_TYPE_HOST_INFO;
channel = KNET_INTERNAL_DATA_CHANNEL;
} else {
type = KNET_HEADER_TYPE_DATA;
for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
if ((knet_h->sockfd[channel].in_use) &&
(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
break;
}
}
if (channel >= KNET_DATAFD_MAX) {
log_debug(knet_h, KNET_SUB_TX, "No available channels");
continue; /* channel not found */
}
}
if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
continue;
}
_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type);
pthread_mutex_unlock(&knet_h->tx_mutex);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED);
return NULL;
}
int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out;
}
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
strerror(savederrno));
err = -1;
goto out;
}
knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA;
memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len);
err = _parse_recv_from_sock(knet_h, buff_len, channel, 1);
savederrno = errno;
pthread_mutex_unlock(&knet_h->tx_mutex);
out:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_out[1];
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(iov_out, 0, sizeof(iov_out));
iov_out[0].iov_base = (void *)buff;
iov_out[0].iov_len = buff_len;
err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
diff --git a/libknet/transport_sctp.c b/libknet/transport_sctp.c
index 8df212a5..19c891d4 100644
--- a/libknet/transport_sctp.c
+++ b/libknet/transport_sctp.c
@@ -1,1602 +1,1606 @@
/*
* Copyright (C) 2016-2020 Red Hat, Inc. All rights reserved.
*
* Author: Christine Caulfield <ccaulfie@redhat.com>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <assert.h>
#include "compat.h"
#include "host.h"
#include "links.h"
#include "links_acl.h"
#include "links_acl_ip.h"
#include "logging.h"
#include "netutils.h"
#include "common.h"
#include "transport_common.h"
#include "transports.h"
#include "threads_common.h"
#ifdef HAVE_NETINET_SCTP_H
#include <netinet/sctp.h>
#include "transport_sctp.h"
typedef struct sctp_handle_info {
struct qb_list_head listen_links_list;
struct qb_list_head connect_links_list;
int connect_epollfd;
int connectsockfd[2];
int listen_epollfd;
int listensockfd[2];
pthread_t connect_thread;
pthread_t listen_thread;
socklen_t event_subscribe_kernel_size;
char *event_subscribe_buffer;
} sctp_handle_info_t;
/*
* use by fd_tracker data type
*/
#define SCTP_NO_LINK_INFO 0
#define SCTP_LISTENER_LINK_INFO 1
#define SCTP_ACCEPTED_LINK_INFO 2
#define SCTP_CONNECT_LINK_INFO 3
/*
* this value is per listener
*/
#define MAX_ACCEPTED_SOCKS 256
typedef struct sctp_listen_link_info {
struct qb_list_head list;
int listen_sock;
int accepted_socks[MAX_ACCEPTED_SOCKS];
struct sockaddr_storage src_address;
int on_listener_epoll;
int on_rx_epoll;
int sock_shutdown;
} sctp_listen_link_info_t;
typedef struct sctp_accepted_link_info {
char mread_buf[KNET_DATABUFSIZE];
ssize_t mread_len;
sctp_listen_link_info_t *link_info;
} sctp_accepted_link_info_t ;
typedef struct sctp_connect_link_info {
struct qb_list_head list;
sctp_listen_link_info_t *listener;
struct knet_link *link;
struct sockaddr_storage dst_address;
int connect_sock;
int on_rx_epoll;
int close_sock;
int sock_shutdown;
} sctp_connect_link_info_t;
/*
* socket handling functions
*
* those functions do NOT perform locking. locking
* should be handled in the right context from callers
*/
/*
* sockets are removed from rx_epoll from callers
* see also error handling functions
*/
static int _close_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
struct epoll_event ev;
sctp_connect_link_info_t *info = kn_link->transport_link;
if (info->connect_sock != -1) {
if (info->on_rx_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->connect_sock;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_rx_epoll = 0;
}
if (_set_fd_tracker(knet_h, info->connect_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
} else {
close(info->connect_sock);
info->connect_sock = -1;
}
}
exit_error:
errno = savederrno;
return err;
}
static int _enable_sctp_notifications(knet_handle_t knet_h, int sock, const char *type)
{
int err = 0, savederrno = 0;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
if (setsockopt(sock, IPPROTO_SCTP, SCTP_EVENTS,
handle_info->event_subscribe_buffer,
handle_info->event_subscribe_kernel_size) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to enable %s events: %s",
type, strerror(savederrno));
}
errno = savederrno;
return err;
}
static int _configure_sctp_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, uint64_t flags, const char *type)
{
int err = 0, savederrno = 0;
int value;
int level;
#ifdef SOL_SCTP
level = SOL_SCTP;
#else
level = IPPROTO_SCTP;
#endif
if (_configure_transport_socket(knet_h, sock, address, flags, type) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
value = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set reuseaddr on socket %d: %s",
sock, strerror(savederrno));
goto exit_error;
}
value = 1;
if (setsockopt(sock, level, SCTP_NODELAY, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set sctp nodelay: %s",
strerror(savederrno));
goto exit_error;
}
if (_enable_sctp_notifications(knet_h, sock, type) < 0) {
savederrno = errno;
err = -1;
}
exit_error:
errno = savederrno;
return err;
}
static int _reconnect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info = kn_link->transport_link;
if (connect(info->connect_sock, (struct sockaddr *)&kn_link->dst_addr, sockaddr_len(&kn_link->dst_addr)) < 0) {
savederrno = errno;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP socket %d received error: %s", info->connect_sock, strerror(savederrno));
if ((savederrno != EALREADY) && (savederrno != EINPROGRESS) && (savederrno != EISCONN)) {
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to connect SCTP socket %d: %s",
info->connect_sock, strerror(savederrno));
}
}
errno = savederrno;
return err;
}
static int _create_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
struct epoll_event ev;
sctp_connect_link_info_t *info = kn_link->transport_link;
int connect_sock;
struct sockaddr_storage connect_addr;
connect_sock = socket(kn_link->dst_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP);
if (connect_sock < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create send/recv socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_configure_sctp_socket(knet_h, connect_sock, &kn_link->dst_addr, kn_link->flags, "SCTP connect") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
memset(&connect_addr, 0, sizeof(struct sockaddr_storage));
if (knet_strtoaddr(kn_link->status.src_ipaddr, "0", &connect_addr, sockaddr_len(&connect_addr)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to resolve connecting socket: %s",
strerror(savederrno));
goto exit_error;
}
if (bind(connect_sock, (struct sockaddr *)&connect_addr, sockaddr_len(&connect_addr)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind connecting socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_set_fd_tracker(knet_h, connect_sock, KNET_TRANSPORT_SCTP, SCTP_CONNECT_LINK_INFO, info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = connect_sock;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, connect_sock, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connected socket to epoll pool: %s",
strerror(errno));
}
info->on_rx_epoll = 1;
info->connect_sock = connect_sock;
info->close_sock = 0;
kn_link->outsock = info->connect_sock;
if (_reconnect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
exit_error:
if (err) {
if (connect_sock >= 0) {
close(connect_sock);
}
}
errno = savederrno;
return err;
}
static void _lock_sleep_relock(knet_handle_t knet_h)
{
int i = 0;
/* Don't hold onto the lock while sleeping */
pthread_rwlock_unlock(&knet_h->global_rwlock);
while (i < 5) {
usleep(knet_h->threads_timer_res / 16);
if (!pthread_rwlock_rdlock(&knet_h->global_rwlock)) {
/*
* lock acquired, we can go out
*/
return;
} else {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get read lock!");
i++;
}
}
/*
* time to crash! if we cannot re-acquire the lock
* there is no easy way out of this one
*/
assert(0);
}
int sctp_transport_tx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_listen_link_info_t *listen_info;
if (recv_err < 0) {
switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) {
case SCTP_CONNECT_LINK_INFO:
if (connect_info->link->transport_connected == 0) {
return -1;
}
break;
case SCTP_ACCEPTED_LINK_INFO:
listen_info = accepted_info->link_info;
if (listen_info->listen_sock != sockfd) {
if (listen_info->on_rx_epoll == 0) {
return -1;
}
}
break;
}
if (recv_errno == EAGAIN) {
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Sock: %d is overloaded. Slowing TX down", sockfd);
#endif
_lock_sleep_relock(knet_h);
return 1;
}
return -1;
}
return 0;
}
/*
* socket error management functions
*
* both called with global read lock.
*
* NOTE: we need to remove the fd from the epoll as soon as possible
* even before we notify the respective thread to take care of it
* because scheduling can make it so that this thread will overload
* and the threads supposed to take care of the error will never
* be able to take action.
* we CANNOT handle FDs here directly (close/reconnect/etc) due
* to locking context. We need to delegate that to their respective
* management threads within the global write lock.
*
* this function is called from:
* - RX thread with recv_err <= 0 directly on recvmmsg error
* - transport_rx_is_data when msg_len == 0 (recv_err = 1)
* - transport_rx_is_data on notification (recv_err = 2)
*
* basically this small abuse of recv_err is to detect notifications
* generated by sockets created by listen().
*/
int sctp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
struct epoll_event ev;
sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_listen_link_info_t *listen_info;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) {
case SCTP_CONNECT_LINK_INFO:
/*
* all connect link have notifications enabled
* and we accept only data from notification and
* generic recvmmsg errors.
*
* Errors generated by msg_len 0 can be ignored because
* they follow a notification (double notification)
*/
if (recv_err != 1) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received an error", sockfd);
if (sendto(handle_info->connectsockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno));
}
}
break;
case SCTP_ACCEPTED_LINK_INFO:
listen_info = accepted_info->link_info;
if (listen_info->listen_sock != sockfd) {
if (recv_err != 1) {
if (listen_info->on_rx_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = sockfd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
strerror(errno));
return -1;
}
listen_info->on_rx_epoll = 0;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying listen thread that sockfd %d received an error", sockfd);
if (sendto(handle_info->listensockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify listen thread: %s", strerror(errno));
}
}
} else {
/*
* this means the listen() socket has generated
* a notification. now what? :-)
*/
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen() socket %d", sockfd);
}
break;
default:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received unknown notification? %d", sockfd);
break;
}
/*
* Under RX pressure we need to give time to IPC to pick up the message
*/
_lock_sleep_relock(knet_h);
return 0;
}
/*
* NOTE: sctp_transport_rx_is_data is called with global rdlock
* delegate any FD error management to sctp_transport_rx_sock_error
* and keep this code to parsing incoming data only
*/
int sctp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
size_t i;
struct iovec *iov = msg->msg_hdr.msg_iov;
size_t iovlen = msg->msg_hdr.msg_iovlen;
struct sctp_assoc_change *sac;
union sctp_notification *snp;
sctp_accepted_link_info_t *listen_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data;
if (!(msg->msg_hdr.msg_flags & MSG_NOTIFICATION)) {
if (msg->msg_len == 0) {
/*
* NOTE: with event notification enabled, we receive error twice:
* 1) from the event notification
* 2) followed by a 0 byte msg_len
*
* the event handler should take care to avoid #2 by stopping
* the rx thread from processing more packets than necessary.
*/
if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
if (connect_info->sock_shutdown) {
return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE;
}
} else {
if (listen_info->link_info->sock_shutdown) {
return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE;
}
}
/*
* this is pretty much dead code and we should never hit it.
* keep it for safety and avoid the rx thread to process
* bad info / data.
*/
return KNET_TRANSPORT_RX_NOT_DATA_STOP;
}
/*
* missing MSG_EOR has to be treated as a short read
* from the socket and we need to fill in the mread buf
* while we wait for MSG_EOR
*/
if (!(msg->msg_hdr.msg_flags & MSG_EOR)) {
/*
* copy the incoming data into mread_buf + mread_len (incremental)
* and increase mread_len
*/
memmove(listen_info->mread_buf + listen_info->mread_len, iov->iov_base, msg->msg_len);
listen_info->mread_len = listen_info->mread_len + msg->msg_len;
return KNET_TRANSPORT_RX_NOT_DATA_CONTINUE;
}
/*
* got EOR.
* if mread_len is > 0 we are completing a packet from short reads
* complete reassembling the packet in mread_buf, copy it back in the iov
* and set the iov/msg len numbers (size) correctly
*/
if (listen_info->mread_len) {
/*
* add last fragment to mread_buf
*/
memmove(listen_info->mread_buf + listen_info->mread_len, iov->iov_base, msg->msg_len);
listen_info->mread_len = listen_info->mread_len + msg->msg_len;
/*
* move all back into the iovec
*/
memmove(iov->iov_base, listen_info->mread_buf, listen_info->mread_len);
msg->msg_len = listen_info->mread_len;
listen_info->mread_len = 0;
}
return KNET_TRANSPORT_RX_IS_DATA;
}
if (!(msg->msg_hdr.msg_flags & MSG_EOR)) {
return KNET_TRANSPORT_RX_NOT_DATA_STOP;
}
for (i = 0; i < iovlen; i++) {
snp = iov[i].iov_base;
switch (snp->sn_header.sn_type) {
case SCTP_ASSOC_CHANGE:
sac = &snp->sn_assoc_change;
switch (sac->sac_state) {
case SCTP_COMM_LOST:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: comm_lost", sockfd);
if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
connect_info->close_sock = 1;
connect_info->link->transport_connected = 0;
}
sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
return KNET_TRANSPORT_RX_OOB_DATA_STOP;
break;
case SCTP_COMM_UP:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: comm_up", sockfd);
if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
connect_info->link->transport_connected = 1;
}
break;
case SCTP_RESTART:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: restart", sockfd);
break;
case SCTP_SHUTDOWN_COMP:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: shutdown comp", sockfd);
if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
connect_info->close_sock = 1;
}
sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
return KNET_TRANSPORT_RX_OOB_DATA_STOP;
break;
case SCTP_CANT_STR_ASSOC:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: cant str assoc", sockfd);
sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
break;
default:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: unknown %d", sockfd, sac->sac_state);
break;
}
break;
case SCTP_SHUTDOWN_EVENT:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp shutdown event socket %d", sockfd);
if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
connect_info->link->transport_connected = 0;
connect_info->sock_shutdown = 1;
} else {
listen_info->link_info->sock_shutdown = 1;
}
break;
case SCTP_SEND_FAILED:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp send failed socket: %d", sockfd);
break;
case SCTP_PEER_ADDR_CHANGE:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp peer addr change socket %d", sockfd);
break;
case SCTP_REMOTE_ERROR:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp remote error socket %d", sockfd);
break;
default:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] unknown sctp event socket: %d type: %hu", sockfd, snp->sn_header.sn_type);
break;
}
}
return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE;
}
int sctp_transport_link_is_down(knet_handle_t knet_h, struct knet_link *kn_link)
{
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_connect_link_info_t *info = kn_link->transport_link;
kn_link->transport_connected = 0;
info->close_sock = 1;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received a link down event", info->connect_sock);
if (sendto(handle_info->connectsockfd[1], &info->connect_sock, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno));
}
return 0;
}
/*
* connect / outgoing socket management thread
*/
/*
* _handle_connected_sctp* are called with a global write lock
* from the connect_thread
*/
static void _handle_connected_sctp_socket(knet_handle_t knet_h, int connect_sock)
{
int err;
unsigned int status, len = sizeof(status);
sctp_connect_link_info_t *info = knet_h->knet_transport_fd_tracker[connect_sock].data;
struct knet_link *kn_link = info->link;
if (info->close_sock) {
if (_close_connect_socket(knet_h, kn_link) < 0) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close sock %d from _handle_connected_sctp_socket: %s", connect_sock, strerror(errno));
return;
}
info->close_sock = 0;
if (_create_connect_socket(knet_h, kn_link) < 0) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to recreate connecting sock! %s", strerror(errno));
return;
}
}
_reconnect_socket(knet_h, info->link);
err = getsockopt(connect_sock, SOL_SOCKET, SO_ERROR, &status, &len);
if (err) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP getsockopt() on connecting socket %d failed: %s",
connect_sock, strerror(errno));
return;
}
if (status) {
log_info(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect on %d to %s port %s failed: %s",
connect_sock, kn_link->status.dst_ipaddr, kn_link->status.dst_port,
strerror(status));
/*
* No need to create a new socket if connect failed,
* just retry connect
*/
return;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP handler fd %d now connected to %s port %s",
connect_sock,
kn_link->status.dst_ipaddr, kn_link->status.dst_port);
}
static void _handle_connected_sctp_notifications(knet_handle_t knet_h)
{
int sockfd = -1;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
if (recv(handle_info->connectsockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on connectsockfd");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for connected socket fd error");
return;
}
/*
* revalidate sockfd
*/
if ((sockfd < 0) || (sockfd >= KNET_MAX_FDS)) {
return;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing connected error on socket: %d", sockfd);
_handle_connected_sctp_socket(knet_h, sockfd);
}
static void *_sctp_connect_thread(void *data)
{
int savederrno;
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_STARTED);
+ memset(&events, 0, sizeof(events));
+
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(handle_info->connect_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000);
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
if (nev < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect handler EPOLL ERROR: %s",
strerror(errno));
continue;
}
/*
* Sort out which FD has a connection
*/
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s",
strerror(savederrno));
continue;
}
/*
* minor optimization: deduplicate events
*
* in some cases we can receive multiple notifcations
* of the same FD having issues or need handling.
* It's enough to process it once even tho it's safe
* to handle them multiple times.
*/
for (i = 0; i < nev; i++) {
if (events[i].data.fd == handle_info->connectsockfd[0]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for connected socket");
_handle_connected_sctp_notifications(knet_h);
} else {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification on connected sockfd %d\n", events[i].data.fd);
}
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
/*
* this thread can generate events for itself.
* we need to sleep in between loops to allow other threads
* to be scheduled
*/
usleep(knet_h->reconnect_int * 1000);
}
set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_STOPPED);
return NULL;
}
/*
* listen/incoming connections management thread
*/
/*
* Listener received a new connection
* called with a write lock from main thread
*/
static void _handle_incoming_sctp(knet_handle_t knet_h, int listen_sock)
{
int err = 0, savederrno = 0;
int new_fd;
int i = -1;
sctp_listen_link_info_t *info = knet_h->knet_transport_fd_tracker[listen_sock].data;
struct epoll_event ev;
struct sockaddr_storage ss;
socklen_t sock_len = sizeof(ss);
char addr_str[KNET_MAX_HOST_LEN];
char port_str[KNET_MAX_PORT_LEN];
sctp_accepted_link_info_t *accept_info = NULL;
new_fd = accept(listen_sock, (struct sockaddr *)&ss, &sock_len);
if (new_fd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accept error: %s", strerror(errno));
goto exit_error;
}
if (knet_addrtostr(&ss, sizeof(ss),
addr_str, KNET_MAX_HOST_LEN,
port_str, KNET_MAX_PORT_LEN) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to gather socket info");
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: received connection from: %s port: %s",
addr_str, port_str);
if (knet_h->use_access_lists) {
if (!check_validate(knet_h, listen_sock, KNET_TRANSPORT_SCTP, &ss)) {
savederrno = EINVAL;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Connection rejected from %s/%s", addr_str, port_str);
close(new_fd);
errno = savederrno;
return;
}
}
/*
* Keep a track of all accepted FDs
*/
for (i=0; i<MAX_ACCEPTED_SOCKS; i++) {
if (info->accepted_socks[i] == -1) {
info->accepted_socks[i] = new_fd;
break;
}
}
if (i == MAX_ACCEPTED_SOCKS) {
errno = EBUSY;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: too many connections!");
goto exit_error;
}
if (_configure_common_socket(knet_h, new_fd, 0, "SCTP incoming") < 0) { /* Inherit flags from listener? */
savederrno = errno;
err = -1;
goto exit_error;
}
if (_enable_sctp_notifications(knet_h, new_fd, "Incoming connection") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
accept_info = malloc(sizeof(sctp_accepted_link_info_t));
if (!accept_info) {
savederrno = errno;
err = -1;
goto exit_error;
}
memset(accept_info, 0, sizeof(sctp_accepted_link_info_t));
accept_info->link_info = info;
if (_set_fd_tracker(knet_h, new_fd, KNET_TRANSPORT_SCTP, SCTP_ACCEPTED_LINK_INFO, accept_info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(errno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = new_fd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, new_fd, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to add accepted socket %d to epoll pool: %s",
new_fd, strerror(errno));
goto exit_error;
}
info->on_rx_epoll = 1;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accepted new fd %d for %s/%s (listen fd: %d). index: %d",
new_fd, addr_str, port_str, info->listen_sock, i);
exit_error:
if (err) {
if ((i >= 0) && (i < MAX_ACCEPTED_SOCKS)) {
info->accepted_socks[i] = -1;
}
_set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL);
free(accept_info);
if (new_fd >= 0) {
close(new_fd);
}
}
errno = savederrno;
return;
}
/*
* Listen thread received a notification of a bad socket that needs closing
* called with a write lock from main thread
*/
static void _handle_listen_sctp_errors(knet_handle_t knet_h)
{
int sockfd = -1;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_accepted_link_info_t *accept_info;
sctp_listen_link_info_t *info;
struct knet_host *host;
int link_idx;
int i;
if (recv(handle_info->listensockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on listensockfd");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen socket fd error");
return;
}
/*
* revalidate sockfd
*/
if ((sockfd < 0) || (sockfd >= KNET_MAX_FDS)) {
return;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing listen error on socket: %d", sockfd);
accept_info = knet_h->knet_transport_fd_tracker[sockfd].data;
info = accept_info->link_info;
/*
* clear all links using this accepted socket as
* outbound dynamically connected socket
*/
for (host = knet_h->host_head; host != NULL; host = host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if ((host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
(host->link[link_idx].outsock == sockfd)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Found dynamic connection on host %d link %d (%d)",
host->host_id, link_idx, sockfd);
host->link[link_idx].status.dynconnected = 0;
host->link[link_idx].transport_connected = 0;
host->link[link_idx].outsock = 0;
memset(&host->link[link_idx].dst_addr, 0, sizeof(struct sockaddr_storage));
}
}
}
for (i=0; i<MAX_ACCEPTED_SOCKS; i++) {
if (sockfd == info->accepted_socks[i]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Closing accepted socket %d", sockfd);
_set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL);
info->accepted_socks[i] = -1;
free(accept_info);
close(sockfd);
break; /* Keeps covscan happy */
}
}
}
static void *_sctp_listen_thread(void *data)
{
int savederrno;
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_STARTED);
+ memset(&events, 0, sizeof(events));
+
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(handle_info->listen_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000);
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
if (nev < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listen handler EPOLL ERROR: %s",
strerror(errno));
continue;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s",
strerror(savederrno));
continue;
}
/*
* Sort out which FD has an incoming connection
*/
for (i = 0; i < nev; i++) {
if (events[i].data.fd == handle_info->listensockfd[0]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for listener/accepted socket");
_handle_listen_sctp_errors(knet_h);
} else {
if (_is_valid_fd(knet_h, events[i].data.fd) == 1) {
_handle_incoming_sctp(knet_h, events[i].data.fd);
} else {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received listen notification from invalid socket");
}
}
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_STOPPED);
return NULL;
}
/*
* sctp_link_listener_start/stop are called in global write lock
* context from set_config and clear_config.
*/
static sctp_listen_link_info_t *sctp_link_listener_start(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int listen_sock = -1;
struct epoll_event ev;
sctp_listen_link_info_t *info = NULL;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
/*
* Only allocate a new listener if src address is different
*/
qb_list_for_each_entry(info, &handle_info->listen_links_list, list) {
if (memcmp(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)) == 0) {
if ((check_add(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP, -1,
&kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) {
return NULL;
}
return info;
}
}
info = malloc(sizeof(sctp_listen_link_info_t));
if (!info) {
err = -1;
goto exit_error;
}
memset(info, 0, sizeof(sctp_listen_link_info_t));
memset(info->accepted_socks, -1, sizeof(info->accepted_socks));
memmove(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage));
listen_sock = socket(kn_link->src_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP);
if (listen_sock < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_configure_sctp_socket(knet_h, listen_sock, &kn_link->src_addr, kn_link->flags, "SCTP listener") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
if (bind(listen_sock, (struct sockaddr *)&kn_link->src_addr, sockaddr_len(&kn_link->src_addr)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (listen(listen_sock, 5) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to listen on listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_set_fd_tracker(knet_h, listen_sock, KNET_TRANSPORT_SCTP, SCTP_LISTENER_LINK_INFO, info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
if ((check_add(knet_h, listen_sock, KNET_TRANSPORT_SCTP, -1,
&kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to configure default access lists: %s",
strerror(savederrno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, listen_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listener to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_listener_epoll = 1;
info->listen_sock = listen_sock;
qb_list_add(&info->list, &handle_info->listen_links_list);
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Listening on fd %d for %s:%s", listen_sock, kn_link->status.src_ipaddr, kn_link->status.src_port);
exit_error:
if (err) {
if ((info) && (info->on_listener_epoll)) {
epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, listen_sock, &ev);
}
if (listen_sock >= 0) {
check_rmall(knet_h, listen_sock, KNET_TRANSPORT_SCTP);
close(listen_sock);
}
if (info) {
free(info);
info = NULL;
}
}
errno = savederrno;
return info;
}
static int sctp_link_listener_stop(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int found = 0, i;
struct knet_host *host;
int link_idx;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_connect_link_info_t *this_link_info = kn_link->transport_link;
sctp_listen_link_info_t *info = this_link_info->listener;
sctp_connect_link_info_t *link_info;
struct epoll_event ev;
for (host = knet_h->host_head; host != NULL; host = host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (&host->link[link_idx] == kn_link)
continue;
link_info = host->link[link_idx].transport_link;
if ((link_info) &&
(link_info->listener == info)) {
found = 1;
break;
}
}
}
if ((check_rm(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP,
&kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != ENOENT)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove default access lists for %d", info->listen_sock);
}
if (found) {
this_link_info->listener = NULL;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listener socket %d still in use", info->listen_sock);
savederrno = EBUSY;
err = -1;
goto exit_error;
}
if (info->on_listener_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->listen_sock;
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, info->listen_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_listener_epoll = 0;
}
if (_set_fd_tracker(knet_h, info->listen_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
check_rmall(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP);
close(info->listen_sock);
for (i=0; i< MAX_ACCEPTED_SOCKS; i++) {
if (info->accepted_socks[i] > -1) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->accepted_socks[i];
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->accepted_socks[i], &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
strerror(errno));
}
info->on_rx_epoll = 0;
free(knet_h->knet_transport_fd_tracker[info->accepted_socks[i]].data);
close(info->accepted_socks[i]);
if (_set_fd_tracker(knet_h, info->accepted_socks[i], KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
info->accepted_socks[i] = -1;
}
}
qb_list_del(&info->list);
free(info);
this_link_info->listener = NULL;
exit_error:
errno = savederrno;
return err;
}
/*
* Links config/clear. Both called with global wrlock from link_set_config/clear_config
*/
int sctp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int savederrno = 0, err = 0;
sctp_connect_link_info_t *info;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
info = malloc(sizeof(sctp_connect_link_info_t));
if (!info) {
goto exit_error;
}
memset(info, 0, sizeof(sctp_connect_link_info_t));
kn_link->transport_link = info;
info->link = kn_link;
memmove(&info->dst_address, &kn_link->dst_addr, sizeof(struct sockaddr_storage));
info->connect_sock = -1;
info->listener = sctp_link_listener_start(knet_h, kn_link);
if (!info->listener) {
savederrno = errno;
err = -1;
goto exit_error;
}
if (kn_link->dynamic == KNET_LINK_STATIC) {
if (_create_connect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
kn_link->outsock = info->connect_sock;
}
qb_list_add(&info->list, &handle_info->connect_links_list);
exit_error:
if (err) {
if (info) {
if (info->connect_sock >= 0) {
close(info->connect_sock);
}
if (info->listener) {
sctp_link_listener_stop(knet_h, kn_link);
}
kn_link->transport_link = NULL;
free(info);
}
}
errno = savederrno;
return err;
}
/*
* called with global wrlock
*/
int sctp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info;
if (!kn_link) {
errno = EINVAL;
return -1;
}
info = kn_link->transport_link;
if (!info) {
errno = EINVAL;
return -1;
}
if ((sctp_link_listener_stop(knet_h, kn_link) <0) && (errno != EBUSY)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener transport: %s",
strerror(savederrno));
goto exit_error;
}
if (_close_connect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close connected socket: %s",
strerror(savederrno));
goto exit_error;
}
qb_list_del(&info->list);
free(info);
kn_link->transport_link = NULL;
exit_error:
errno = savederrno;
return err;
}
/*
* transport_free and transport_init are
* called only from knet_handle_new and knet_handle_free.
* all resources (hosts/links) should have been already freed at this point
* and they are called in a write locked context, hence they
* don't need their own locking.
*/
int sctp_transport_free(knet_handle_t knet_h)
{
sctp_handle_info_t *handle_info;
void *thread_status;
struct epoll_event ev;
if (!knet_h->transports[KNET_TRANSPORT_SCTP]) {
errno = EINVAL;
return -1;
}
handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
/*
* keep it here while we debug list usage and such
*/
if (!qb_list_empty(&handle_info->listen_links_list)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. listen links list is not empty");
}
if (!qb_list_empty(&handle_info->connect_links_list)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. connect links list is not empty");
}
if (handle_info->listen_thread) {
pthread_cancel(handle_info->listen_thread);
pthread_join(handle_info->listen_thread, &thread_status);
}
if (handle_info->connect_thread) {
pthread_cancel(handle_info->connect_thread);
pthread_join(handle_info->connect_thread, &thread_status);
}
if (handle_info->listensockfd[0] >= 0) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->listensockfd[0];
epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, handle_info->listensockfd[0], &ev);
}
if (handle_info->connectsockfd[0] >= 0) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->connectsockfd[0];
epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, handle_info->connectsockfd[0], &ev);
}
_close_socketpair(knet_h, handle_info->connectsockfd);
_close_socketpair(knet_h, handle_info->listensockfd);
if (handle_info->listen_epollfd >= 0) {
close(handle_info->listen_epollfd);
}
if (handle_info->connect_epollfd >= 0) {
close(handle_info->connect_epollfd);
}
free(handle_info->event_subscribe_buffer);
free(handle_info);
knet_h->transports[KNET_TRANSPORT_SCTP] = NULL;
return 0;
}
static int _sctp_subscribe_init(knet_handle_t knet_h)
{
int test_socket, savederrno;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
char dummy_events[100];
struct sctp_event_subscribe *events;
/* Below we set the first 6 fields of this expanding struct.
* SCTP_EVENTS is deprecated, but SCTP_EVENT is not available
* on Linux; on the other hand, FreeBSD and old Linux does not
* accept small transfers, so we can't simply use this minimum
* everywhere. Thus we query and store the native size. */
const unsigned int subscribe_min = 6;
test_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_SCTP);
if (test_socket < 0) {
if (errno == EPROTONOSUPPORT) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP not supported, skipping initialization");
return 0;
}
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create test socket: %s",
strerror(savederrno));
return savederrno;
}
handle_info->event_subscribe_kernel_size = sizeof dummy_events;
if (getsockopt(test_socket, IPPROTO_SCTP, SCTP_EVENTS, &dummy_events,
&handle_info->event_subscribe_kernel_size)) {
close(test_socket);
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to query kernel size of struct sctp_event_subscribe: %s",
strerror(savederrno));
return savederrno;
}
close(test_socket);
if (handle_info->event_subscribe_kernel_size < subscribe_min) {
savederrno = ERANGE;
log_err(knet_h, KNET_SUB_TRANSP_SCTP,
"No kernel support for the necessary notifications: struct sctp_event_subscribe is %u bytes, %u needed",
handle_info->event_subscribe_kernel_size, subscribe_min);
return savederrno;
}
events = malloc(handle_info->event_subscribe_kernel_size);
if (!events) {
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSP_SCTP,
"Failed to allocate event subscribe buffer: %s", strerror(savederrno));
return savederrno;
}
memset(events, 0, handle_info->event_subscribe_kernel_size);
events->sctp_data_io_event = 1;
events->sctp_association_event = 1;
events->sctp_address_event = 1;
events->sctp_send_failure_event = 1;
events->sctp_peer_error_event = 1;
events->sctp_shutdown_event = 1;
handle_info->event_subscribe_buffer = (char *)events;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Size of struct sctp_event_subscribe is %u in kernel, %zu in user space",
handle_info->event_subscribe_kernel_size, sizeof(struct sctp_event_subscribe));
return 0;
}
int sctp_transport_init(knet_handle_t knet_h)
{
int err = 0, savederrno = 0;
sctp_handle_info_t *handle_info;
struct epoll_event ev;
if (knet_h->transports[KNET_TRANSPORT_SCTP]) {
errno = EEXIST;
return -1;
}
handle_info = malloc(sizeof(sctp_handle_info_t));
if (!handle_info) {
return -1;
}
memset(handle_info, 0,sizeof(sctp_handle_info_t));
knet_h->transports[KNET_TRANSPORT_SCTP] = handle_info;
savederrno = _sctp_subscribe_init(knet_h);
if (savederrno) {
err = -1;
goto exit_fail;
}
qb_list_init(&handle_info->listen_links_list);
qb_list_init(&handle_info->connect_links_list);
handle_info->listen_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (handle_info->listen_epollfd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll listen fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(handle_info->listen_epollfd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on listen_epollfd: %s",
strerror(savederrno));
goto exit_fail;
}
handle_info->connect_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (handle_info->connect_epollfd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll connect fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(handle_info->connect_epollfd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on connect_epollfd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, handle_info->connectsockfd) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init connect socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->connectsockfd[0];
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, handle_info->connectsockfd[0], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connectsockfd[0] to connect epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, handle_info->listensockfd) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init listen socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->listensockfd[0];
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, handle_info->listensockfd[0], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listensockfd[0] to listen epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
/*
* Start connect & listener threads
*/
set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&handle_info->listen_thread, 0, _sctp_listen_thread, (void *) knet_h);
if (savederrno) {
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp listen thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&handle_info->connect_thread, 0, _sctp_connect_thread, (void *) knet_h);
if (savederrno) {
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp connect thread: %s",
strerror(savederrno));
goto exit_fail;
}
exit_fail:
if (err < 0) {
sctp_transport_free(knet_h);
}
errno = savederrno;
return err;
}
int sctp_transport_link_dyn_connect(knet_handle_t knet_h, int sockfd, struct knet_link *kn_link)
{
kn_link->outsock = sockfd;
kn_link->status.dynconnected = 1;
kn_link->transport_connected = 1;
return 0;
}
int sctp_transport_link_get_acl_fd(knet_handle_t knet_h, struct knet_link *kn_link)
{
sctp_connect_link_info_t *this_link_info = kn_link->transport_link;
sctp_listen_link_info_t *info = this_link_info->listener;
return info->listen_sock;
}
#endif
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Feb 25, 6:34 AM (1 d, 12 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464714
Default Alt Text
(108 KB)
Attached To
Mode
rK kronosnet
Attached
Detach File
Event Timeline
Log In to Comment