Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
index 0e9ba432..3b047786 100644
--- a/libknet/threads_rx.c
+++ b/libknet/threads_rx.c
@@ -1,836 +1,840 @@
/*
* Copyright (C) 2012-2018 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/uio.h>
#include <pthread.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "links.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_rx.h"
#include "netutils.h"
/*
* RECV
*/
/*
* return 1 if a > b
* return -1 if b > a
* return 0 if they are equal
*/
static inline int timecmp(struct timespec a, struct timespec b)
{
if (a.tv_sec != b.tv_sec) {
if (a.tv_sec > b.tv_sec) {
return 1;
} else {
return -1;
}
} else {
if (a.tv_nsec > b.tv_nsec) {
return 1;
} else if (a.tv_nsec < b.tv_nsec) {
return -1;
} else {
return 0;
}
}
}
/*
* this functions needs to return an index (0 to 7)
* to a knet_host_defrag_buf. (-1 on errors)
*/
static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
{
struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
int i, oldest;
/*
* check if there is a buffer already in use handling the same seq_num
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (src_host->defrag_buf[i].in_use) {
if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
return i;
}
}
}
/*
* If there is no buffer that's handling the current seq_num
* either it's new or it's been reclaimed already.
* check if it's been reclaimed/seen before using the defrag circular
* buffer. If the pckt has been seen before, the buffer expired (ETIME)
* and there is no point to try to defrag it again.
*/
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
errno = ETIME;
return -1;
}
/*
* register the pckt as seen
*/
_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
/*
* see if there is a free buffer
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (!src_host->defrag_buf[i].in_use) {
return i;
}
}
/*
* at this point, there are no free buffers, the pckt is new
* and we need to reclaim a buffer, and we will take the one
* with the oldest timestamp. It's as good as any.
*/
oldest = 0;
for (i = 0; i < KNET_MAX_LINK; i++) {
if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
oldest = i;
}
}
src_host->defrag_buf[oldest].in_use = 0;
return oldest;
}
static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
{
struct knet_host_defrag_buf *defrag_buf;
int defrag_buf_idx;
defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
if (defrag_buf_idx < 0) {
if (errno == ETIME) {
log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired");
}
return 1;
}
defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
/*
* if the buf is not is use, then make sure it's clean
*/
if (!defrag_buf->in_use) {
memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
defrag_buf->in_use = 1;
defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
}
/*
* update timestamp on the buffer
*/
clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
/*
* check if we already received this fragment
*/
if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
/*
* if we have received this fragment and we didn't clear the buffer
* it means that we don't have all fragments yet
*/
return 1;
}
/*
* we need to handle the last packet with gloves due to its different size
*/
if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
defrag_buf->last_frag_size = *len;
/*
* in the event when the last packet arrives first,
* we still don't know the offset vs the other fragments (based on MTU),
* so we store the fragment at the end of the buffer where it's safe
* and take a copy of the len so that we can restore its offset later.
* remember we can't use the local MTU for this calculation because pMTU
* can be asymettric between the same hosts.
*/
if (!defrag_buf->frag_size) {
defrag_buf->last_first = 1;
memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
inbuf->khp_data_userdata,
*len);
}
} else {
defrag_buf->frag_size = *len;
}
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
inbuf->khp_data_userdata, *len);
defrag_buf->frag_recv++;
defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
/*
* check if we received all the fragments
*/
if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
/*
* special case the last pckt
*/
if (defrag_buf->last_first) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
defrag_buf->last_frag_size);
}
/*
* recalculate packet lenght
*/
*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
/*
* copy the pckt back in the user data
*/
memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
/*
* free this buffer
*/
defrag_buf->in_use = 0;
return 0;
}
return 1;
}
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
{
int err = 0, savederrno = 0;
ssize_t outlen;
struct knet_host *src_host;
struct knet_link *src_link;
unsigned long long latency_last;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
int was_decrypted = 0;
uint64_t crypt_time = 0;
struct timespec recvtime;
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[1];
int8_t channel;
struct sockaddr_storage pckt_src;
seq_num_t recv_seq_num;
int wipe_bufs = 0;
if (knet_h->crypto_instance) {
struct timespec start_time;
struct timespec end_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_authenticate_and_decrypt(knet_h,
(unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_decrypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
return;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &crypt_time);
if (crypt_time < knet_h->stats.rx_crypt_time_min) {
knet_h->stats.rx_crypt_time_min = crypt_time;
}
if (crypt_time > knet_h->stats.rx_crypt_time_max) {
knet_h->stats.rx_crypt_time_max = crypt_time;
}
len = outlen;
inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
was_decrypted++;
}
if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
return;
}
if (inbuf->kh_version != KNET_HEADER_VERSION) {
log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
return;
}
inbuf->kh_node = ntohs(inbuf->kh_node);
src_host = knet_h->host_index[inbuf->kh_node];
if (src_host == NULL) { /* host not found */
log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
return;
}
src_link = NULL;
src_link = src_host->link +
(inbuf->khp_ping_link % KNET_MAX_LINK);
if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
if (src_link->dynamic == KNET_LINK_DYNIP) {
/*
* cpyaddrport will only copy address and port of the incoming
* packet and strip extra bits such as flow and scopeid
*/
cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
&pckt_src, sockaddr_len(&pckt_src)) != 0) {
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
src_host->host_id, src_link->link_id);
memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
} else {
log_info(knet_h, KNET_SUB_RX,
"host: %u link: %u new connection established from: %s %s",
src_host->host_id, src_link->link_id,
src_link->status.dst_ipaddr, src_link->status.dst_port);
}
}
/*
* transport has already accepted the connection here
* otherwise we would not be receiving packets
*/
transport_link_dyn_connect(knet_h, sockfd, src_link);
}
}
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_HOST_INFO:
case KNET_HEADER_TYPE_DATA:
/*
* TODO: should we accept data even if we can't reply to the other node?
* how would that work with SCTP and guaranteed delivery?
*/
if (!src_host->status.reachable) {
log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id);
//return;
}
inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
channel = inbuf->khp_data_channel;
src_host->got_data = 1;
if (src_link) {
src_link->status.stats.rx_data_packets++;
src_link->status.stats.rx_data_bytes += len;
}
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
}
return;
}
if (inbuf->khp_data_frag_num > 1) {
/*
* len as received from the socket also includes extra stuff
* that the defrag code doesn't care about. So strip it
* here and readd only for repadding once we are done
* defragging
*/
len = len - KNET_HEADER_DATA_SIZE;
if (pckt_defrag(knet_h, inbuf, &len)) {
return;
}
len = len + KNET_HEADER_DATA_SIZE;
}
if (inbuf->khp_data_compress) {
ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = decompress(knet_h, inbuf->khp_data_compress,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
knet_h->recv_from_links_buf_decompress,
&decmp_outlen);
if (!err) {
/* Collect stats */
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (compress_time < knet_h->stats.rx_compress_time_min) {
knet_h->stats.rx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.rx_compress_time_max) {
knet_h->stats.rx_compress_time_max = compress_time;
}
knet_h->stats.rx_compress_time_ave =
(knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets +
compress_time) / (knet_h->stats.rx_compressed_packets+1);
knet_h->stats.rx_compressed_packets++;
knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE;
memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
len = decmp_outlen + KNET_HEADER_DATA_SIZE;
} else {
log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
err, strerror(errno));
return;
}
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (knet_h->enabled != 1) /* data forward is disabled */
break;
/* Only update the crypto overhead for data packets. Mainly to be
consistent with TX */
knet_h->stats.rx_crypt_time_ave =
(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
crypt_time) / (knet_h->stats.rx_crypt_packets+1);
knet_h->stats.rx_crypt_packets++;
if (knet_h->dst_host_filter_fn) {
size_t host_idx;
int found = 0;
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
&channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
return;
}
if ((!bcast) && (!dst_host_ids_entries)) {
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return;
}
/* check if we are dst for this packet */
if (!bcast) {
+ if (dst_host_ids_entries > KNET_MAX_HOST) {
+ log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
+ return;
+ }
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
if (!found) {
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return;
}
}
}
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (!knet_h->sockfd[channel].in_use) {
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
memset(iov_out, 0, sizeof(iov_out));
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata;
iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE;
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
return;
}
if ((size_t)outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
}
} else { /* HOSTINFO */
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
bcast = 0;
knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
}
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
return;
}
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
switch(knet_hostinfo->khi_type) {
case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
break;
case KNET_HOSTINFO_TYPE_LINK_TABLE:
break;
default:
log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
break;
}
}
break;
case KNET_HEADER_TYPE_PING:
outlen = KNET_HEADER_PING_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PONG;
inbuf->kh_node = htons(knet_h->host_id);
recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
src_link->status.stats.rx_ping_packets++;
src_link->status.stats.rx_ping_bytes += len;
wipe_bufs = 0;
if (!inbuf->khp_ping_timed) {
/*
* we might be receiving this message from all links, but we want
* to process it only the first time
*/
if (recv_seq_num != src_host->untimed_rx_seq_num) {
/*
* cache the untimed seq num
*/
src_host->untimed_rx_seq_num = recv_seq_num;
/*
* if the host has received data in between
* untimed ping, then we don't need to wipe the bufs
*/
if (src_host->got_data) {
src_host->got_data = 0;
wipe_bufs = 0;
} else {
wipe_bufs = 1;
}
}
_seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
} else {
/*
* pings always arrives in bursts over all the link
* catch the first of them to cache the seq num and
* avoid duplicate processing
*/
if (recv_seq_num != src_host->timed_rx_seq_num) {
src_host->timed_rx_seq_num = recv_seq_num;
if (recv_seq_num == 0) {
_seq_num_lookup(src_host, recv_seq_num, 0, 1);
}
}
}
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
knet_h->stats_extra.tx_crypt_pong_packets++;
}
retry_pong:
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr,
sizeof(struct sockaddr_storage));
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport_type, src_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pong_errors++;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pong_retries++;
goto retry_pong;
break;
}
}
src_link->status.stats.tx_pong_packets++;
src_link->status.stats.tx_pong_bytes += outlen;
break;
case KNET_HEADER_TYPE_PONG:
src_link->status.stats.rx_pong_packets++;
src_link->status.stats.rx_pong_bytes += len;
clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
timespec_diff(recvtime,
src_link->status.pong_last, &latency_last);
src_link->status.latency =
((src_link->status.latency * src_link->latency_exp) +
((latency_last / 1000llu) *
(src_link->latency_fix - src_link->latency_exp))) /
src_link->latency_fix;
if (src_link->status.latency < src_link->pong_timeout_adj) {
if (!src_link->status.connected) {
if (src_link->received_pong >= src_link->pong_count) {
log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
src_host->host_id, src_link->link_id);
_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
} else {
src_link->received_pong++;
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
src_host->host_id, src_link->link_id, src_link->received_pong);
}
}
}
/* Calculate latency stats */
if (src_link->status.latency > src_link->status.stats.latency_max) {
src_link->status.stats.latency_max = src_link->status.latency;
}
if (src_link->status.latency < src_link->status.stats.latency_min) {
src_link->status.stats.latency_min = src_link->status.latency;
}
src_link->status.stats.latency_ave =
(src_link->status.stats.latency_ave * src_link->status.stats.latency_samples +
src_link->status.latency) / (src_link->status.stats.latency_samples+1);
src_link->status.stats.latency_samples++;
break;
case KNET_HEADER_TYPE_PMTUD:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
outlen = KNET_HEADER_PMTUD_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
inbuf->kh_node = htons(knet_h->host_id);
if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
knet_h->stats_extra.tx_crypt_pmtu_reply_packets++;
}
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
goto out_pmtud;
}
retry_pmtud:
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr,
sizeof(struct sockaddr_storage));
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport_type, src_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pmtu_errors++;
break;
case 0: /* ignore error and continue */
src_link->status.stats.tx_pmtu_errors++;
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pmtu_retries++;
goto retry_pmtud;
break;
}
}
pthread_mutex_unlock(&knet_h->tx_mutex);
out_pmtud:
break;
case KNET_HEADER_TYPE_PMTUD_REPLY:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
break;
}
src_link->last_recv_mtu = inbuf->khp_pmtud_size;
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
break;
default:
return;
}
}
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int err, savederrno;
int i, msg_recv, transport;
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
/*
* this is normal if a fd got an event and before we grab the read lock
* and the link is removed by another thread
*/
goto exit_unlock;
}
transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
/*
* reset msg_namelen to buffer size because after recvmmsg
* each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
*/
for (i = 0; i < PCKT_RX_BUFS; i++) {
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
}
msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
/*
* WARNING: man page for recvmmsg is wrong. Kernel implementation here:
* recvmmsg can return:
* -1 on error
* 0 if the previous run of recvmmsg recorded an error on the socket
* N number of messages (see exception below).
*
* If there is an error from recvmsg after receiving a frame or more, the recvmmsg
* loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
* it will be visibile in the next run.
*
* Need to be careful how we handle errors at this stage.
*
* error messages need to be handled on a per transport/protocol base
* at this point we have different layers of error handling
* - msg_recv < 0 -> error from this run
* msg_recv = 0 -> error from previous run and error on socket needs to be cleared
* - per-transport message data
* example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
* but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
* - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
* errno set. That means the error api needs to be able to abort the loop below.
*/
if (msg_recv <= 0) {
transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno);
goto exit_unlock;
}
for (i = 0; i < msg_recv; i++) {
err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]);
/*
* TODO: make this section silent once we are confident
* all protocols packet handlers are good
*/
switch(err) {
case -1: /* on error */
log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
goto exit_unlock;
break;
case 0: /* packet is not data and we should continue the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
break;
case 1: /* packet is not data and we should STOP the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
goto exit_unlock;
break;
case 2: /* packet is data and should be parsed as such */
_parse_recv_from_links(knet_h, sockfd, &msg[i]);
break;
}
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_RX_BUFS];
struct knet_mmsghdr msg[PCKT_RX_BUFS];
struct iovec iov_in[PCKT_RX_BUFS];
memset(&msg, 0, sizeof(msg));
for (i = 0; i < PCKT_RX_BUFS; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
iov_in[i].iov_len = KNET_DATABUFSIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
}
}
return NULL;
}
diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c
index 5d4062ef..3c7a7f3b 100644
--- a/libknet/threads_tx.c
+++ b/libknet/threads_tx.c
@@ -1,737 +1,745 @@
/*
* Copyright (C) 2012-2018 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under GPL-2.0+, LGPL-2.0+
*/
#include "config.h"
#include <math.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/uio.h>
#include <errno.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "link.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_tx.h"
#include "netutils.h"
/*
* SEND
*/
static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
{
int link_idx, msg_idx, sent_msgs, prev_sent, progress;
int err = 0, savederrno = 0;
unsigned int i;
struct knet_mmsghdr *cur;
struct knet_link *cur_link;
for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
sent_msgs = 0;
prev_sent = 0;
progress = 1;
cur_link = &dst_host->link[dst_host->active_links[link_idx]];
if (cur_link->transport_type == KNET_TRANSPORT_LOOPBACK) {
continue;
}
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr;
/* Cast for Linux/BSD compatibility */
for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) {
cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len;
}
cur_link->status.stats.tx_data_packets++;
msg_idx++;
}
retry:
cur = &msg[prev_sent];
sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
&cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport_type, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
switch(err) {
case -1: /* unrecoverable error */
cur_link->status.stats.tx_data_errors++;
goto out_unlock;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
cur_link->status.stats.tx_data_retries++;
goto retry;
break;
}
prev_sent = prev_sent + sent_msgs;
if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) {
if ((sent_msgs) || (progress)) {
if (sent_msgs) {
progress = 1;
} else {
progress = 0;
}
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
sent_msgs, msg_idx,
dst_host->name, dst_host->host_id,
dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
dst_host->link[dst_host->active_links[link_idx]].link_id);
#endif
goto retry;
}
if (!progress) {
savederrno = EAGAIN;
err = -1;
goto out_unlock;
}
}
if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
(dst_host->active_link_entries > 1)) {
uint8_t cur_link_id = dst_host->active_links[0];
memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
break;
}
}
out_unlock:
errno = savederrno;
return err;
}
static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync)
{
size_t outlen, frag_len;
struct knet_host *dst_host;
knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST];
size_t dst_host_ids_entries_temp = 0;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[PCKT_FRAG_MAX][2];
int iovcnt_out = 2;
uint8_t frag_idx;
unsigned int temp_data_mtu;
size_t host_idx;
int send_mcast = 0;
struct knet_header *inbuf;
int savederrno = 0;
int err = 0;
seq_num_t tx_seq_num;
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
int msgs_to_send, msg_idx;
unsigned int i;
int j;
int send_local = 0;
int data_compressed = 0;
size_t uncrypted_frag_size;
inbuf = knet_h->recv_from_sock_buf;
if ((knet_h->enabled != 1) &&
(inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
savederrno = ECANCELED;
err = -1;
goto out_unlock;
}
/*
* move this into a separate function to expand on
* extra switching rules
*/
switch(inbuf->kh_type) {
case KNET_HEADER_TYPE_DATA:
if (knet_h->dst_host_filter_fn) {
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
inlen,
KNET_NOTIFY_TX,
knet_h->host_id,
knet_h->host_id,
&channel,
dst_host_ids_temp,
&dst_host_ids_entries_temp);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
savederrno = EFAULT;
err = -1;
goto out_unlock;
}
if ((!bcast) && (!dst_host_ids_entries_temp)) {
log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
+
+ if ((!bcast) &&
+ (dst_host_ids_entries_temp > KNET_MAX_HOST)) {
+ log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations");
+ savederrno = EINVAL;
+ err = -1;
+ goto out_unlock;
+ }
}
/* Send to localhost if appropriate and enabled */
if (knet_h->has_loop_link) {
send_local = 0;
if (bcast) {
send_local = 1;
} else {
for (i=0; i< dst_host_ids_entries_temp; i++) {
if (dst_host_ids_temp[i] == knet_h->host_id) {
send_local = 1;
}
}
}
if (send_local) {
const unsigned char *buf = inbuf->khp_data_userdata;
ssize_t buflen = inlen;
struct knet_link *local_link;
local_link = knet_h->host_index[knet_h->host_id]->link;
local_retry:
err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen);
if (err < 0) {
log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno));
local_link->status.stats.tx_data_errors++;
}
if (err > 0 && err < buflen) {
log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen);
local_link->status.stats.tx_data_retries++;
buf += err;
buflen -= err;
usleep(KNET_THREADS_TIMERES / 16);
goto local_retry;
}
if (err == buflen) {
local_link->status.stats.tx_data_packets++;
local_link->status.stats.tx_data_bytes += inlen;
}
}
}
break;
case KNET_HEADER_TYPE_HOST_INFO:
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
bcast = 0;
dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
dst_host_ids_entries_temp = 1;
knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
}
break;
default:
log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
savederrno = ENOMSG;
err = -1;
goto out_unlock;
break;
}
if (is_sync) {
if ((bcast) ||
((!bcast) && (dst_host_ids_entries_temp > 1))) {
log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
savederrno = E2BIG;
err = -1;
goto out_unlock;
}
}
/*
* check destinations hosts before spending time
* in fragmenting/encrypting packets to save
* time processing data for unreachable hosts.
* for unicast, also remap the destination data
* to skip unreachable hosts.
*/
if (!bcast) {
dst_host_ids_entries = 0;
for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
if (!dst_host) {
continue;
}
if (!(dst_host->host_id == knet_h->host_id &&
knet_h->has_loop_link) &&
dst_host->status.reachable) {
dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
dst_host_ids_entries++;
}
}
if (!dst_host_ids_entries) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
} else {
send_mcast = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (!(dst_host->host_id == knet_h->host_id &&
knet_h->has_loop_link) &&
dst_host->status.reachable) {
send_mcast = 1;
break;
}
}
if (!send_mcast) {
savederrno = EHOSTDOWN;
err = -1;
goto out_unlock;
}
}
if (!knet_h->data_mtu) {
/*
* using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
*/
log_debug(knet_h, KNET_SUB_TX,
"Received data packet but data MTU is still unknown."
" Packet might not be delivered."
" Assuming minimum IPv4 MTU (%d)",
KNET_PMTUD_MIN_MTU_V4);
temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
} else {
/*
* take a copy of the mtu to avoid value changing under
* our feet while we are sending a fragmented pckt
*/
temp_data_mtu = knet_h->data_mtu;
}
/*
* compress data
*/
if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) {
size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = compress(knet_h,
(const unsigned char *)inbuf->khp_data_userdata, inlen,
knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen);
if (err < 0) {
log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(errno));
} else {
/* Collect stats */
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (compress_time < knet_h->stats.tx_compress_time_min) {
knet_h->stats.tx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.tx_compress_time_max) {
knet_h->stats.tx_compress_time_max = compress_time;
}
knet_h->stats.tx_compress_time_ave =
(unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets +
compress_time) / (knet_h->stats.tx_compressed_packets+1);
knet_h->stats.tx_compressed_packets++;
knet_h->stats.tx_compressed_original_bytes += inlen;
knet_h->stats.tx_compressed_size_bytes += cmp_outlen;
if (cmp_outlen < inlen) {
memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen);
inlen = cmp_outlen;
data_compressed = 1;
}
}
}
if ((knet_h->compress_model > 0) && (inlen <= knet_h->compress_threshold)) {
knet_h->stats.tx_uncompressed_packets++;
}
/*
* prepare the outgoing buffers
*/
frag_len = inlen;
frag_idx = 0;
inbuf->khp_data_bcast = bcast;
inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
inbuf->khp_data_channel = channel;
if (data_compressed) {
inbuf->khp_data_compress = knet_h->compress_model;
} else {
inbuf->khp_data_compress = 0;
}
if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
goto out_unlock;
}
knet_h->tx_seq_num++;
/*
* force seq_num 0 to detect a node that has crashed and rejoining
* the knet instance. seq_num 0 will clear the buffers in the RX
* thread
*/
if (knet_h->tx_seq_num == 0) {
knet_h->tx_seq_num++;
}
/*
* cache the value in locked context
*/
tx_seq_num = knet_h->tx_seq_num;
inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num);
pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
/*
* forcefully broadcast a ping to all nodes every SEQ_MAX / 8
* pckts.
* this solves 2 problems:
* 1) on TX socket overloads we generate extra pings to keep links alive
* 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
* node 3+ will be able to keep in sync on the TX seq_num even without
* receiving traffic or pings in betweens. This avoids issues with
* rollover of the circular buffer
*/
if (tx_seq_num % (SEQ_MAX / 8) == 0) {
_send_pings(knet_h, 0);
}
if (inbuf->khp_data_frag_num > 1) {
while (frag_idx < inbuf->khp_data_frag_num) {
/*
* set the iov_base
*/
iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE;
iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx);
/*
* set the len
*/
if (frag_len > temp_data_mtu) {
iov_out[frag_idx][1].iov_len = temp_data_mtu;
} else {
iov_out[frag_idx][1].iov_len = frag_len;
}
/*
* copy the frag info on all buffers
*/
knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress;
frag_len = frag_len - temp_data_mtu;
frag_idx++;
}
iovcnt_out = 2;
} else {
iov_out[frag_idx][0].iov_base = (void *)inbuf;
iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
iovcnt_out = 1;
}
if (knet_h->crypto_instance) {
struct timespec start_time;
struct timespec end_time;
uint64_t crypt_time;
frag_idx = 0;
while (frag_idx < inbuf->khp_data_frag_num) {
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_encrypt_and_signv(
knet_h,
iov_out[frag_idx], iovcnt_out,
knet_h->send_to_links_buf_crypt[frag_idx],
(ssize_t *)&outlen) < 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet");
savederrno = ECHILD;
err = -1;
goto out_unlock;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &crypt_time);
if (crypt_time < knet_h->stats.tx_crypt_time_min) {
knet_h->stats.tx_crypt_time_min = crypt_time;
}
if (crypt_time > knet_h->stats.tx_crypt_time_max) {
knet_h->stats.tx_crypt_time_max = crypt_time;
}
knet_h->stats.tx_crypt_time_ave =
(knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets +
crypt_time) / (knet_h->stats.tx_crypt_packets+1);
uncrypted_frag_size = 0;
for (j=0; j < iovcnt_out; j++) {
uncrypted_frag_size += iov_out[frag_idx][j].iov_len;
}
knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size);
knet_h->stats.tx_crypt_packets++;
iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
iov_out[frag_idx][0].iov_len = outlen;
frag_idx++;
}
iovcnt_out = 1;
}
memset(&msg, 0, sizeof(msg));
msgs_to_send = inbuf->khp_data_frag_num;
msg_idx = 0;
while (msg_idx < msgs_to_send) {
msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0];
msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out;
msg_idx++;
}
if (!bcast) {
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids[host_idx]];
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
} else {
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
if (dst_host->status.reachable) {
err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
savederrno = errno;
if (err) {
goto out_unlock;
}
}
}
}
out_unlock:
errno = savederrno;
return err;
}
int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0, err = 0;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out;
}
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
strerror(savederrno));
err = -1;
goto out;
}
knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA;
memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len);
err = _parse_recv_from_sock(knet_h, buff_len, channel, 1);
savederrno = errno;
pthread_mutex_unlock(&knet_h->tx_mutex);
out:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type)
{
ssize_t inlen = 0;
int savederrno = 0, docallback = 0;
if ((channel >= 0) &&
(channel < KNET_DATAFD_MAX) &&
(!knet_h->sockfd[channel].is_socket)) {
inlen = readv(sockfd, msg->msg_iov, 1);
} else {
inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL);
}
if (inlen == 0) {
savederrno = 0;
docallback = 1;
goto out;
}
if (inlen < 0) {
savederrno = errno;
docallback = 1;
goto out;
}
knet_h->recv_from_sock_buf->kh_type = type;
_parse_recv_from_sock(knet_h, inlen, channel, 0);
out:
if (inlen < 0) {
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
} else {
knet_h->sockfd[channel].has_error = 1;
}
}
if (docallback) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_TX,
inlen,
savederrno);
}
}
void *_handle_send_to_links_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
int i, nev, type;
int8_t channel;
struct iovec iov_in;
struct msghdr msg;
struct sockaddr_storage address;
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata;
iov_in.iov_len = KNET_MAX_PACKET_SIZE;
memset(&msg, 0, sizeof(struct msghdr));
msg.msg_name = &address;
msg.msg_namelen = sizeof(struct sockaddr_storage);
msg.msg_iov = &iov_in;
msg.msg_iovlen = 1;
knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION;
knet_h->recv_from_sock_buf->khp_data_frag_seq = 0;
knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id);
for (i = 0; i < PCKT_FRAG_MAX; i++) {
knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1);
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
continue;
}
for (i = 0; i < nev; i++) {
if (events[i].data.fd == knet_h->hostsockfd[0]) {
type = KNET_HEADER_TYPE_HOST_INFO;
channel = -1;
} else {
type = KNET_HEADER_TYPE_DATA;
for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
if ((knet_h->sockfd[channel].in_use) &&
(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
break;
}
}
if (channel >= KNET_DATAFD_MAX) {
log_debug(knet_h, KNET_SUB_TX, "No available channels");
continue; /* channel not found */
}
}
if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
continue;
}
_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type);
pthread_mutex_unlock(&knet_h->tx_mutex);
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
return NULL;
}

File Metadata

Mime Type
text/x-diff
Expires
Tue, Feb 25, 8:57 AM (1 d, 14 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464765
Default Alt Text
(47 KB)

Event Timeline