Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
new file mode 100644
index 00000000..37375346
--- /dev/null
+++ b/libknet/threads_rx.c
@@ -0,0 +1,1302 @@
+/*
+ * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ * Federico Simoncelli <fsimon@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#include "config.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <pthread.h>
+#include <math.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <sys/uio.h>
+
+#include "crypto.h"
+#include "compat.h"
+#include "host.h"
+#include "link.h"
+#include "logging.h"
+#include "transports.h"
+#include "threads_common.h"
+#include "threads_heartbeat.h"
+#include "threads_send_recv.h"
+#include "netutils.h"
+
+/*
+ * SEND
+ */
+
+static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct iovec *iov_out)
+{
+ int link_idx, msg_idx, sent_msgs, msgs_to_send, prev_sent, progress;
+ struct mmsghdr msg[PCKT_FRAG_MAX];
+ int err = 0, savederrno = 0;
+
+ memset(&msg, 0, sizeof(struct mmsghdr));
+
+ for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
+
+ msgs_to_send = knet_h->send_to_links_buf[0]->khp_data_frag_num;
+ sent_msgs = 0;
+ prev_sent = 0;
+ progress = 1;
+
+retry:
+ msg_idx = 0;
+
+ while (msg_idx < msgs_to_send) {
+ memset(&msg[msg_idx].msg_hdr, 0, sizeof(struct msghdr));
+ msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr;
+ msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx + prev_sent];
+ msg[msg_idx].msg_hdr.msg_iovlen = 1;
+ msg_idx++;
+ }
+
+ sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
+ msg, msg_idx, MSG_DONTWAIT | MSG_NOSIGNAL);
+ savederrno = errno;
+
+ err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
+ switch(err) {
+ case -1: /* unrecoverable error */
+ goto out_unlock;
+ break;
+ case 0: /* ignore error and continue */
+ break;
+ case 1: /* retry to send those same data */
+ goto retry;
+ break;
+ }
+
+ if ((sent_msgs >= 0) && (sent_msgs < msg_idx)) {
+ if ((sent_msgs) || (progress)) {
+ msgs_to_send = msg_idx - sent_msgs;
+ prev_sent = prev_sent + sent_msgs;
+ if (sent_msgs) {
+ progress = 1;
+ } else {
+ progress = 0;
+ }
+ log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
+ sent_msgs, msg_idx,
+ dst_host->name, dst_host->host_id,
+ dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
+ dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
+ dst_host->link[dst_host->active_links[link_idx]].link_id);
+ goto retry;
+ }
+ if (!progress) {
+ savederrno = EAGAIN;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+
+ if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
+ (dst_host->active_link_entries > 1)) {
+ uint8_t cur_link_id = dst_host->active_links[0];
+
+ memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
+ dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
+
+ break;
+ }
+ }
+
+out_unlock:
+ errno = savederrno;
+ return err;
+}
+
+static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync)
+{
+ ssize_t outlen, frag_len;
+ struct knet_host *dst_host;
+ uint16_t dst_host_ids_temp[KNET_MAX_HOST];
+ size_t dst_host_ids_entries_temp = 0;
+ uint16_t dst_host_ids[KNET_MAX_HOST];
+ size_t dst_host_ids_entries = 0;
+ int bcast = 1;
+ struct knet_hostinfo *knet_hostinfo;
+ struct iovec iov_out[PCKT_FRAG_MAX];
+ uint8_t frag_idx;
+ unsigned int temp_data_mtu;
+ int host_idx;
+ int send_mcast = 0;
+ struct knet_header *inbuf;
+ int savederrno = 0;
+ int err = 0;
+ seq_num_t tx_seq_num;
+
+ inbuf = knet_h->recv_from_sock_buf[buf_idx];
+
+ if ((knet_h->enabled != 1) &&
+ (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
+ log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
+ savederrno = ECANCELED;
+ err = -1;
+ goto out_unlock;
+ }
+
+ /*
+ * move this into a separate function to expand on
+ * extra switching rules
+ */
+ switch(inbuf->kh_type) {
+ case KNET_HEADER_TYPE_DATA:
+ if (knet_h->dst_host_filter_fn) {
+ bcast = knet_h->dst_host_filter_fn(
+ knet_h->dst_host_filter_fn_private_data,
+ (const unsigned char *)inbuf->khp_data_userdata,
+ inlen,
+ KNET_NOTIFY_TX,
+ knet_h->host_id,
+ knet_h->host_id,
+ &channel,
+ dst_host_ids_temp,
+ &dst_host_ids_entries_temp);
+ if (bcast < 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
+ savederrno = EFAULT;
+ err = -1;
+ goto out_unlock;
+ }
+
+ if ((!bcast) && (!dst_host_ids_entries_temp)) {
+ log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
+ savederrno = EINVAL;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+ break;
+ case KNET_HEADER_TYPE_HOST_INFO:
+ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
+ if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
+ bcast = 0;
+ dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
+ dst_host_ids_entries_temp = 1;
+ knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
+ }
+ break;
+ default:
+ log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
+ savederrno = ENOMSG;
+ err = -1;
+ goto out_unlock;
+ break;
+ }
+
+ if (is_sync) {
+ if ((bcast) ||
+ ((!bcast) && (dst_host_ids_entries_temp > 1))) {
+ log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
+ savederrno = E2BIG;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+
+ /*
+ * check destinations hosts before spending time
+ * in fragmenting/encrypting packets to save
+ * time processing data for unrechable hosts.
+ * for unicast, also remap the destination data
+ * to skip unreachable hosts.
+ */
+
+ if (!bcast) {
+ dst_host_ids_entries = 0;
+ for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
+ dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
+ if (!dst_host) {
+ continue;
+ }
+ if (dst_host->status.reachable) {
+ dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
+ dst_host_ids_entries++;
+ }
+ }
+ if (!dst_host_ids_entries) {
+ savederrno = EHOSTDOWN;
+ err = -1;
+ goto out_unlock;
+ }
+ } else {
+ send_mcast = 0;
+ for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
+ if (dst_host->status.reachable) {
+ send_mcast = 1;
+ break;
+ }
+ }
+ if (!send_mcast) {
+ savederrno = EHOSTDOWN;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+
+ if (!knet_h->data_mtu) {
+ /*
+ * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
+ */
+ log_debug(knet_h, KNET_SUB_TX,
+ "Received data packet but data MTU is still unknown."
+ " Packet might not be delivered."
+ " Assuming mininum IPv4 mtu (%d)",
+ KNET_PMTUD_MIN_MTU_V4);
+ temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
+ } else {
+ /*
+ * take a copy of the mtu to avoid value changing under
+ * our feet while we are sending a fragmented pckt
+ */
+ temp_data_mtu = knet_h->data_mtu;
+ }
+
+ /*
+ * prepare the outgoing buffers
+ */
+
+ frag_len = inlen;
+ frag_idx = 0;
+
+ inbuf->khp_data_bcast = bcast;
+ inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
+ inbuf->khp_data_channel = channel;
+
+ if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
+ goto out_unlock;
+ }
+ knet_h->tx_seq_num++;
+ /*
+ * force seq_num 0 to detect a node that has crashed and rejoining
+ * the knet instance. seq_num 0 will clear the buffers in the RX
+ * thread
+ */
+ if (knet_h->tx_seq_num == 0) {
+ knet_h->tx_seq_num++;
+ }
+ /*
+ * cache the value in locked context
+ */
+ tx_seq_num = knet_h->tx_seq_num;
+ knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(knet_h->tx_seq_num);
+ pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
+
+ /*
+ * forcefully broadcast a ping to all nodes every SEQ_MAX / 8
+ * pckts.
+ * this solves 2 problems:
+ * 1) on TX socket overloads we generate extra pings to keep links alive
+ * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
+ * node 3+ will be able to keep in sync on the TX seq_num even without
+ * receiving traffic or pings in betweens. This avoids issues with
+ * rollover of the circular buffer
+ */
+
+ if (tx_seq_num % (SEQ_MAX / 8) == 0) {
+ _send_pings(knet_h, 0);
+ }
+
+ while (frag_idx < inbuf->khp_data_frag_num) {
+ /*
+ * set the iov_base
+ */
+ iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
+
+ /*
+ * set the len
+ */
+ if (frag_len > temp_data_mtu) {
+ iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE;
+ } else {
+ iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
+ }
+
+ /*
+ * copy the frag info on all buffers
+ */
+ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
+
+ knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num;
+ knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
+ knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
+ knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
+
+ memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata,
+ inbuf->khp_data_userdata + (temp_data_mtu * frag_idx),
+ iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE);
+
+ frag_len = frag_len - temp_data_mtu;
+ frag_idx++;
+ }
+
+ if (knet_h->crypto_instance) {
+ frag_idx = 0;
+ while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) {
+ if (crypto_encrypt_and_sign(
+ knet_h,
+ (const unsigned char *)knet_h->send_to_links_buf[frag_idx],
+ iov_out[frag_idx].iov_len,
+ knet_h->send_to_links_buf_crypt[frag_idx],
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt unicast packet");
+ savederrno = ECHILD;
+ err = -1;
+ goto out_unlock;
+ }
+ iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
+ iov_out[frag_idx].iov_len = outlen;
+ frag_idx++;
+ }
+ }
+
+ if (!bcast) {
+ for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
+ dst_host = knet_h->host_index[dst_host_ids[host_idx]];
+
+ err = _dispatch_to_links(knet_h, dst_host, iov_out);
+ savederrno = errno;
+ if (err) {
+ goto out_unlock;
+ }
+ }
+ } else {
+ for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
+ if (dst_host->status.reachable) {
+ err = _dispatch_to_links(knet_h, dst_host, iov_out);
+ savederrno = errno;
+ if (err) {
+ goto out_unlock;
+ }
+ }
+ }
+ }
+
+out_unlock:
+ errno = savederrno;
+ return err;
+}
+
+int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
+{
+ int savederrno = 0, err = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len <= 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len > KNET_MAX_PACKET_SIZE) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel >= KNET_DATAFD_MAX) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ if (!knet_h->sockfd[channel].in_use) {
+ savederrno = EINVAL;
+ err = -1;
+ goto out;
+ }
+
+ savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
+ strerror(savederrno));
+ err = -1;
+ goto out;
+ }
+
+ knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA;
+ memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len);
+ err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1);
+ savederrno = errno;
+
+ pthread_mutex_unlock(&knet_h->tx_mutex);
+
+out:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ errno = savederrno;
+ return err;
+}
+
+static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct mmsghdr *msg, int type)
+{
+ ssize_t inlen = 0;
+ struct iovec iov_in;
+ int msg_recv, i;
+ int savederrno = 0, docallback = 0;
+
+ if ((channel >= 0) &&
+ (channel < KNET_DATAFD_MAX) &&
+ (!knet_h->sockfd[channel].is_socket)) {
+ memset(&iov_in, 0, sizeof(iov_in));
+ iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata;
+ iov_in.iov_len = KNET_MAX_PACKET_SIZE;
+
+ inlen = readv(sockfd, &iov_in, 1);
+
+ if (inlen <= 0) {
+ savederrno = errno;
+ docallback = 1;
+ goto out;
+ }
+
+ msg_recv = 1;
+ knet_h->recv_from_sock_buf[0]->kh_type = type;
+ _parse_recv_from_sock(knet_h, 0, inlen, channel, 0);
+ } else {
+ msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
+ if (msg_recv < 0) {
+ inlen = msg_recv;
+ savederrno = errno;
+ docallback = 1;
+ goto out;
+ }
+ for (i = 0; i < msg_recv; i++) {
+ inlen = msg[i].msg_len;
+ if (inlen == 0) {
+ savederrno = 0;
+ docallback = 1;
+ goto out;
+ break;
+ }
+ knet_h->recv_from_sock_buf[i]->kh_type = type;
+ _parse_recv_from_sock(knet_h, i, inlen, channel, 0);
+ }
+ }
+
+out:
+ if (inlen < 0) {
+ struct epoll_event ev;
+
+ memset(&ev, 0, sizeof(struct epoll_event));
+
+ if (epoll_ctl(knet_h->send_to_links_epollfd,
+ EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
+ knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
+ } else {
+ knet_h->sockfd[channel].has_error = 1;
+ }
+
+ }
+
+ if (docallback) {
+ knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
+ knet_h->sockfd[channel].sockfd[0],
+ channel,
+ KNET_NOTIFY_TX,
+ inlen,
+ savederrno);
+ }
+}
+
+void *_handle_send_to_links_thread(void *data)
+{
+ knet_handle_t knet_h = (knet_handle_t) data;
+ struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
+ struct sockaddr_storage address[PCKT_FRAG_MAX];
+ struct mmsghdr msg[PCKT_FRAG_MAX];
+ struct iovec iov_in[PCKT_FRAG_MAX];
+ int i, nev, type;
+ int8_t channel;
+
+ memset(&msg, 0, sizeof(struct mmsghdr));
+
+ /* preparing data buffer */
+ for (i = 0; i < PCKT_FRAG_MAX; i++) {
+ iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata;
+ iov_in[i].iov_len = KNET_MAX_PACKET_SIZE;
+
+ memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
+
+ msg[i].msg_hdr.msg_name = &address[i];
+ msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ msg[i].msg_hdr.msg_iov = &iov_in[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+
+ knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION;
+ knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0;
+ knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id);
+
+ knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
+ knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
+ knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
+ }
+
+ while (!shutdown_in_progress(knet_h)) {
+ nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1);
+
+ if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
+ continue;
+ }
+
+ for (i = 0; i < nev; i++) {
+ if (events[i].data.fd == knet_h->hostsockfd[0]) {
+ type = KNET_HEADER_TYPE_HOST_INFO;
+ channel = -1;
+ } else {
+ type = KNET_HEADER_TYPE_DATA;
+ for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
+ if ((knet_h->sockfd[channel].in_use) &&
+ (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
+ break;
+ }
+ }
+ }
+ if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
+ continue;
+ }
+ _handle_send_to_links(knet_h, events[i].data.fd, channel, msg, type);
+ pthread_mutex_unlock(&knet_h->tx_mutex);
+ }
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ }
+
+ return NULL;
+}
+
+/*
+ * RECV
+ */
+
+/*
+ * return 1 if a > b
+ * return -1 if b > a
+ * return 0 if they are equal
+ */
+static inline int timecmp(struct timespec a, struct timespec b)
+{
+ if (a.tv_sec != b.tv_sec) {
+ if (a.tv_sec > b.tv_sec) {
+ return 1;
+ } else {
+ return -1;
+ }
+ } else {
+ if (a.tv_nsec > b.tv_nsec) {
+ return 1;
+ } else if (a.tv_nsec < b.tv_nsec) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+}
+
+/*
+ * this functions needs to return an index (0 to 7)
+ * to a knet_host_defrag_buf. (-1 on errors)
+ */
+
+static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
+{
+ struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
+ int i, oldest;
+
+ /*
+ * check if there is a buffer already in use handling the same seq_num
+ */
+ for (i = 0; i < KNET_MAX_LINK; i++) {
+ if (src_host->defrag_buf[i].in_use) {
+ if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
+ return i;
+ }
+ }
+ }
+
+ /*
+ * If there is no buffer that's handling the current seq_num
+ * either it's new or it's been reclaimed already.
+ * check if it's been reclaimed/seen before using the defrag circular
+ * buffer. If the pckt has been seen before, the buffer expired (ETIME)
+ * and there is no point to try to defrag it again.
+ */
+ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
+ errno = ETIME;
+ return -1;
+ }
+
+ /*
+ * register the pckt as seen
+ */
+ _seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
+
+ /*
+ * see if there is a free buffer
+ */
+ for (i = 0; i < KNET_MAX_LINK; i++) {
+ if (!src_host->defrag_buf[i].in_use) {
+ return i;
+ }
+ }
+
+ /*
+ * at this point, there are no free buffers, the pckt is new
+ * and we need to reclaim a buffer, and we will take the one
+ * with the oldest timestamp. It's as good as any.
+ */
+
+ oldest = 0;
+
+ for (i = 0; i < KNET_MAX_LINK; i++) {
+ if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
+ oldest = i;
+ }
+ }
+ src_host->defrag_buf[oldest].in_use = 0;
+ return oldest;
+}
+
+static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
+{
+ struct knet_host_defrag_buf *defrag_buf;
+ int defrag_buf_idx;
+
+ defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
+ if (defrag_buf_idx < 0) {
+ if (errno == ETIME) {
+ log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired");
+ }
+ return 1;
+ }
+
+ defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
+
+ /*
+ * if the buf is not is use, then make sure it's clean
+ */
+ if (!defrag_buf->in_use) {
+ memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
+ defrag_buf->in_use = 1;
+ defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
+ }
+
+ /*
+ * update timestamp on the buffer
+ */
+ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
+
+ /*
+ * check if we already received this fragment
+ */
+ if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
+ /*
+ * if we have received this fragment and we didn't clear the buffer
+ * it means that we don't have all fragments yet
+ */
+ return 1;
+ }
+
+ /*
+ * we need to handle the last packet with gloves due to its different size
+ */
+
+ if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
+ defrag_buf->last_frag_size = *len;
+
+ /*
+ * in the event when the last packet arrives first,
+ * we still don't know the offset vs the other fragments (based on MTU),
+ * so we store the fragment at the end of the buffer where it's safe
+ * and take a copy of the len so that we can restore its offset later.
+ * remember we can't use the local MTU for this calculation because pMTU
+ * can be asymettric between the same hosts.
+ */
+ if (!defrag_buf->frag_size) {
+ defrag_buf->last_first = 1;
+ memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
+ inbuf->khp_data_userdata,
+ *len);
+ }
+ } else {
+ defrag_buf->frag_size = *len;
+ }
+
+ memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
+ inbuf->khp_data_userdata, *len);
+
+ defrag_buf->frag_recv++;
+ defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
+
+ /*
+ * check if we received all the fragments
+ */
+ if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
+ /*
+ * special case the last pckt
+ */
+
+ if (defrag_buf->last_first) {
+ memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
+ defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
+ defrag_buf->last_frag_size);
+ }
+
+ /*
+ * recalculate packet lenght
+ */
+
+ *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
+
+ /*
+ * copy the pckt back in the user data
+ */
+ memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
+
+ /*
+ * free this buffer
+ */
+ defrag_buf->in_use = 0;
+ return 0;
+ }
+
+ return 1;
+}
+
+static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct mmsghdr *msg)
+{
+ int err = 0, savederrno = 0;
+ ssize_t outlen;
+ struct knet_host *src_host;
+ struct knet_link *src_link;
+ unsigned long long latency_last;
+ uint16_t dst_host_ids[KNET_MAX_HOST];
+ size_t dst_host_ids_entries = 0;
+ int bcast = 1;
+ struct timespec recvtime;
+ struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
+ unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
+ ssize_t len = msg->msg_len;
+ struct knet_hostinfo *knet_hostinfo;
+ struct iovec iov_out[1];
+ int8_t channel;
+ struct sockaddr_storage pckt_src;
+ seq_num_t recv_seq_num;
+ int wipe_bufs = 0;
+
+ if (knet_h->crypto_instance) {
+ if (crypto_authenticate_and_decrypt(knet_h,
+ (unsigned char *)inbuf,
+ len,
+ knet_h->recv_from_links_buf_decrypt,
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
+ return;
+ }
+ len = outlen;
+ inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
+ }
+
+ if (len < (KNET_HEADER_SIZE + 1)) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", len);
+ return;
+ }
+
+ if (inbuf->kh_version != KNET_HEADER_VERSION) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
+ return;
+ }
+
+ inbuf->kh_node = ntohs(inbuf->kh_node);
+ src_host = knet_h->host_index[inbuf->kh_node];
+ if (src_host == NULL) { /* host not found */
+ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
+ return;
+ }
+
+ src_link = NULL;
+
+ if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
+ src_link = src_host->link +
+ (inbuf->khp_ping_link % KNET_MAX_LINK);
+ if (src_link->dynamic == KNET_LINK_DYNIP) {
+ /*
+ * cpyaddrport will only copy address and port of the incoming
+ * packet and strip extra bits such as flow and scopeid
+ */
+ cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
+
+ if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
+ &pckt_src, sockaddr_len(&pckt_src)) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
+ src_host->host_id, src_link->link_id);
+ memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
+ if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
+ src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
+ src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
+ snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
+ snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
+ } else {
+ log_info(knet_h, KNET_SUB_RX,
+ "host: %u link: %u new connection established from: %s %s",
+ src_host->host_id, src_link->link_id,
+ src_link->status.dst_ipaddr, src_link->status.dst_port);
+ }
+ }
+ /*
+ * transport has already accepted the connection here
+ * otherwise we would not be receiving packets
+ */
+ knet_h->transport_ops[src_link->transport_type]->transport_link_dyn_connect(knet_h, sockfd, src_link);
+ }
+ }
+
+ switch (inbuf->kh_type) {
+ case KNET_HEADER_TYPE_HOST_INFO:
+ case KNET_HEADER_TYPE_DATA:
+ /*
+ * TODO: should we accept data even if we can't reply to the other node?
+ * how would that work with SCTP and guaranteed delivery?
+ */
+
+ if (!src_host->status.reachable) {
+ log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id);
+ //return;
+ }
+ inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
+ channel = inbuf->khp_data_channel;
+ src_host->got_data = 1;
+
+ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
+ if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
+ }
+ return;
+ }
+
+ if (inbuf->khp_data_frag_num > 1) {
+ /*
+ * len as received from the socket also includes extra stuff
+ * that the defrag code doesn't care about. So strip it
+ * here and readd only for repadding once we are done
+ * defragging
+ */
+ len = len - KNET_HEADER_DATA_SIZE;
+ if (pckt_defrag(knet_h, inbuf, &len)) {
+ return;
+ }
+ len = len + KNET_HEADER_DATA_SIZE;
+ }
+
+ if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
+ if (knet_h->enabled != 1) /* data forward is disabled */
+ break;
+
+ if (knet_h->dst_host_filter_fn) {
+ int host_idx;
+ int found = 0;
+
+ bcast = knet_h->dst_host_filter_fn(
+ knet_h->dst_host_filter_fn_private_data,
+ (const unsigned char *)inbuf->khp_data_userdata,
+ len - KNET_HEADER_DATA_SIZE,
+ KNET_NOTIFY_RX,
+ knet_h->host_id,
+ inbuf->kh_node,
+ &channel,
+ dst_host_ids,
+ &dst_host_ids_entries);
+ if (bcast < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
+ return;
+ }
+
+ if ((!bcast) && (!dst_host_ids_entries)) {
+ log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
+ return;
+ }
+
+ /* check if we are dst for this packet */
+ if (!bcast) {
+ for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
+ if (dst_host_ids[host_idx] == knet_h->host_id) {
+ found = 1;
+ break;
+ }
+ }
+ if (!found) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
+ return;
+ }
+ }
+ }
+ }
+
+ if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
+ if (!knet_h->sockfd[channel].in_use) {
+ log_debug(knet_h, KNET_SUB_RX,
+ "received packet for channel %d but there is no local sock connected",
+ channel);
+ return;
+ }
+
+ memset(iov_out, 0, sizeof(iov_out));
+ iov_out[0].iov_base = (void *) inbuf->khp_data_userdata;
+ iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE;
+
+ outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
+ if (outlen <= 0) {
+ knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
+ knet_h->sockfd[channel].sockfd[0],
+ channel,
+ KNET_NOTIFY_RX,
+ outlen,
+ errno);
+ return;
+ }
+ if (outlen == iov_out[0].iov_len) {
+ _seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
+ }
+ } else { /* HOSTINFO */
+ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
+ if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
+ bcast = 0;
+ knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
+ }
+ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
+ return;
+ }
+ _seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
+ switch(knet_hostinfo->khi_type) {
+ case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
+ break;
+ case KNET_HOSTINFO_TYPE_LINK_TABLE:
+ break;
+ default:
+ log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
+ break;
+ }
+ }
+ break;
+ case KNET_HEADER_TYPE_PING:
+ outlen = KNET_HEADER_PING_SIZE;
+ inbuf->kh_type = KNET_HEADER_TYPE_PONG;
+ inbuf->kh_node = htons(knet_h->host_id);
+ recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
+
+ wipe_bufs = 0;
+
+ if (!inbuf->khp_ping_timed) {
+ /*
+ * we might be receiving this message from all links, but we want
+ * to process it only the first time
+ */
+ if (recv_seq_num != src_host->untimed_rx_seq_num) {
+ /*
+ * cache the untimed seq num
+ */
+ src_host->untimed_rx_seq_num = recv_seq_num;
+ /*
+ * if the host has received data in between
+ * untimed ping, then we don't need to wipe the bufs
+ */
+ if (src_host->got_data) {
+ src_host->got_data = 0;
+ wipe_bufs = 0;
+ } else {
+ wipe_bufs = 1;
+ }
+ }
+ _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
+ } else {
+ /*
+ * pings always arrives in bursts over all the link
+ * catch the first of them to cache the seq num and
+ * avoid duplicate processing
+ */
+ if (recv_seq_num != src_host->timed_rx_seq_num) {
+ src_host->timed_rx_seq_num = recv_seq_num;
+
+ if (recv_seq_num == 0) {
+ _seq_num_lookup(src_host, recv_seq_num, 0, 1);
+ }
+ }
+ }
+
+ if (knet_h->crypto_instance) {
+ if (crypto_encrypt_and_sign(knet_h,
+ (const unsigned char *)inbuf,
+ len,
+ knet_h->recv_from_links_buf_crypt,
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
+ break;
+ }
+ outbuf = knet_h->recv_from_links_buf_crypt;
+ }
+
+retry_pong:
+ len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
+ (struct sockaddr *) &src_link->dst_addr,
+ sizeof(struct sockaddr_storage));
+ savederrno = errno;
+ if (len != outlen) {
+ err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
+ switch(err) {
+ case -1: /* unrecoverable error */
+ log_debug(knet_h, KNET_SUB_RX,
+ "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
+ src_link->outsock, errno, strerror(errno),
+ src_link->status.src_ipaddr, src_link->status.src_port,
+ src_link->status.dst_ipaddr, src_link->status.dst_port);
+ break;
+ case 0: /* ignore error and continue */
+ break;
+ case 1: /* retry to send those same data */
+ goto retry_pong;
+ break;
+ }
+ }
+ break;
+ case KNET_HEADER_TYPE_PONG:
+ clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
+
+ memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
+ timespec_diff(recvtime,
+ src_link->status.pong_last, &latency_last);
+
+ src_link->status.latency =
+ ((src_link->status.latency * src_link->latency_exp) +
+ ((latency_last / 1000llu) *
+ (src_link->latency_fix - src_link->latency_exp))) /
+ src_link->latency_fix;
+
+ if (src_link->status.latency < src_link->pong_timeout) {
+ if (!src_link->status.connected) {
+ if (src_link->received_pong >= src_link->pong_count) {
+ log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
+ src_host->host_id, src_link->link_id);
+ _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
+ } else {
+ src_link->received_pong++;
+ log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
+ src_host->host_id, src_link->link_id, src_link->received_pong);
+ }
+ }
+ }
+
+ break;
+ case KNET_HEADER_TYPE_PMTUD:
+ outlen = KNET_HEADER_PMTUD_SIZE;
+ inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
+ inbuf->kh_node = htons(knet_h->host_id);
+
+ if (knet_h->crypto_instance) {
+ if (crypto_encrypt_and_sign(knet_h,
+ (const unsigned char *)inbuf,
+ len,
+ knet_h->recv_from_links_buf_crypt,
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
+ break;
+ }
+ outbuf = knet_h->recv_from_links_buf_crypt;
+ }
+
+retry_pmtud:
+ len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
+ (struct sockaddr *) &src_link->dst_addr,
+ sizeof(struct sockaddr_storage));
+ if (len != outlen) {
+ err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
+ switch(err) {
+ case -1: /* unrecoverable error */
+ log_debug(knet_h, KNET_SUB_RX,
+ "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
+ src_link->outsock, errno, strerror(errno),
+ src_link->status.src_ipaddr, src_link->status.src_port,
+ src_link->status.dst_ipaddr, src_link->status.dst_port);
+ break;
+ case 0: /* ignore error and continue */
+ break;
+ case 1: /* retry to send those same data */
+ goto retry_pmtud;
+ break;
+ }
+ }
+
+ break;
+ case KNET_HEADER_TYPE_PMTUD_REPLY:
+ if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
+ break;
+ }
+ src_link->last_recv_mtu = inbuf->khp_pmtud_size;
+ pthread_cond_signal(&knet_h->pmtud_cond);
+ pthread_mutex_unlock(&knet_h->pmtud_mutex);
+ break;
+ default:
+ return;
+ }
+}
+
+static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg)
+{
+ int err, savederrno;
+ int i, msg_recv, transport;
+
+ if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
+ return;
+ }
+
+ if (_is_valid_fd(knet_h, sockfd) < 1) {
+ /*
+ * this is normal if a fd got an event and before we grab the read lock
+ * and the link is removed by another thread
+ */
+ goto exit_unlock;
+ }
+
+ transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
+
+ /*
+ * reset msg_namelen to buffer size because after recvmmsg
+ * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
+ */
+
+ for (i = 0; i < PCKT_FRAG_MAX; i++) {
+ msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ }
+
+ msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
+ savederrno = errno;
+
+ /*
+ * WARNING: man page for recvmmsg is wrong. Kernel implementation here:
+ * recvmmsg can return:
+ * -1 on error
+ * 0 if the previous run of recvmmsg recorded an error on the socket
+ * N number of messages (see exception below).
+ *
+ * If there is an error from recvmsg after receiving a frame or more, the recvmmsg
+ * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
+ * it will be visibile in the next run.
+ *
+ * Need to be careful how we handle errors at this stage.
+ *
+ * error messages need to be handled on a per transport/protocol base
+ * at this point we have different layers of error handling
+ * - msg_recv < 0 -> error from this run
+ * msg_recv = 0 -> error from previous run and error on socket needs to be cleared
+ * - per-transport message data
+ * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
+ * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
+ * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
+ * errno set. That means the error api needs to be able to abort the loop below.
+ */
+
+ if (msg_recv <= 0) {
+ knet_h->transport_ops[transport]->transport_rx_sock_error(knet_h, sockfd, msg_recv, savederrno);
+ goto exit_unlock;
+ }
+
+ for (i = 0; i < msg_recv; i++) {
+ err = knet_h->transport_ops[transport]->transport_rx_is_data(knet_h, sockfd, &msg[i]);
+
+ /*
+ * TODO: make this section silent once we are confident
+ * all protocols packet handlers are good
+ */
+
+ switch(err) {
+ case -1: /* on error */
+ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
+ goto exit_unlock;
+ break;
+ case 0: /* packet is not data and we should continue the packet process loop */
+ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
+ break;
+ case 1: /* packet is not data and we should STOP the packet process loop */
+ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
+ goto exit_unlock;
+ break;
+ case 2: /* packet is data and should be parsed as such */
+ _parse_recv_from_links(knet_h, sockfd, &msg[i]);
+ break;
+ }
+ }
+
+exit_unlock:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+}
+
+void *_handle_recv_from_links_thread(void *data)
+{
+ int i, nev;
+ knet_handle_t knet_h = (knet_handle_t) data;
+ struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
+ struct sockaddr_storage address[PCKT_FRAG_MAX];
+ struct mmsghdr msg[PCKT_FRAG_MAX];
+ struct iovec iov_in[PCKT_FRAG_MAX];
+
+ memset(&msg, 0, sizeof(msg));
+
+ for (i = 0; i < PCKT_FRAG_MAX; i++) {
+ iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
+ iov_in[i].iov_len = KNET_DATABUFSIZE;
+
+ memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
+
+ msg[i].msg_hdr.msg_name = &address[i];
+ msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ msg[i].msg_hdr.msg_iov = &iov_in[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+ }
+
+ while (!shutdown_in_progress(knet_h)) {
+ nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
+
+ for (i = 0; i < nev; i++) {
+ _handle_recv_from_links(knet_h, events[i].data.fd, msg);
+ }
+ }
+
+ return NULL;
+}
diff --git a/libknet/threads_rx.h b/libknet/threads_rx.h
new file mode 100644
index 00000000..fec6e5f1
--- /dev/null
+++ b/libknet/threads_rx.h
@@ -0,0 +1,16 @@
+/*
+ * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ * Federico Simoncelli <fsimon@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#ifndef __THREADS_SEND_RECV_H__
+#define __THREADS_SEND_RECV_H__
+
+void *_handle_send_to_links_thread(void *data);
+void *_handle_recv_from_links_thread(void *data);
+
+#endif
diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c
new file mode 100644
index 00000000..37375346
--- /dev/null
+++ b/libknet/threads_tx.c
@@ -0,0 +1,1302 @@
+/*
+ * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ * Federico Simoncelli <fsimon@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#include "config.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <pthread.h>
+#include <math.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <sys/uio.h>
+
+#include "crypto.h"
+#include "compat.h"
+#include "host.h"
+#include "link.h"
+#include "logging.h"
+#include "transports.h"
+#include "threads_common.h"
+#include "threads_heartbeat.h"
+#include "threads_send_recv.h"
+#include "netutils.h"
+
+/*
+ * SEND
+ */
+
+static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct iovec *iov_out)
+{
+ int link_idx, msg_idx, sent_msgs, msgs_to_send, prev_sent, progress;
+ struct mmsghdr msg[PCKT_FRAG_MAX];
+ int err = 0, savederrno = 0;
+
+ memset(&msg, 0, sizeof(struct mmsghdr));
+
+ for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
+
+ msgs_to_send = knet_h->send_to_links_buf[0]->khp_data_frag_num;
+ sent_msgs = 0;
+ prev_sent = 0;
+ progress = 1;
+
+retry:
+ msg_idx = 0;
+
+ while (msg_idx < msgs_to_send) {
+ memset(&msg[msg_idx].msg_hdr, 0, sizeof(struct msghdr));
+ msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr;
+ msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx + prev_sent];
+ msg[msg_idx].msg_hdr.msg_iovlen = 1;
+ msg_idx++;
+ }
+
+ sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
+ msg, msg_idx, MSG_DONTWAIT | MSG_NOSIGNAL);
+ savederrno = errno;
+
+ err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
+ switch(err) {
+ case -1: /* unrecoverable error */
+ goto out_unlock;
+ break;
+ case 0: /* ignore error and continue */
+ break;
+ case 1: /* retry to send those same data */
+ goto retry;
+ break;
+ }
+
+ if ((sent_msgs >= 0) && (sent_msgs < msg_idx)) {
+ if ((sent_msgs) || (progress)) {
+ msgs_to_send = msg_idx - sent_msgs;
+ prev_sent = prev_sent + sent_msgs;
+ if (sent_msgs) {
+ progress = 1;
+ } else {
+ progress = 0;
+ }
+ log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
+ sent_msgs, msg_idx,
+ dst_host->name, dst_host->host_id,
+ dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
+ dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
+ dst_host->link[dst_host->active_links[link_idx]].link_id);
+ goto retry;
+ }
+ if (!progress) {
+ savederrno = EAGAIN;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+
+ if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
+ (dst_host->active_link_entries > 1)) {
+ uint8_t cur_link_id = dst_host->active_links[0];
+
+ memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
+ dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
+
+ break;
+ }
+ }
+
+out_unlock:
+ errno = savederrno;
+ return err;
+}
+
+static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync)
+{
+ ssize_t outlen, frag_len;
+ struct knet_host *dst_host;
+ uint16_t dst_host_ids_temp[KNET_MAX_HOST];
+ size_t dst_host_ids_entries_temp = 0;
+ uint16_t dst_host_ids[KNET_MAX_HOST];
+ size_t dst_host_ids_entries = 0;
+ int bcast = 1;
+ struct knet_hostinfo *knet_hostinfo;
+ struct iovec iov_out[PCKT_FRAG_MAX];
+ uint8_t frag_idx;
+ unsigned int temp_data_mtu;
+ int host_idx;
+ int send_mcast = 0;
+ struct knet_header *inbuf;
+ int savederrno = 0;
+ int err = 0;
+ seq_num_t tx_seq_num;
+
+ inbuf = knet_h->recv_from_sock_buf[buf_idx];
+
+ if ((knet_h->enabled != 1) &&
+ (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
+ log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
+ savederrno = ECANCELED;
+ err = -1;
+ goto out_unlock;
+ }
+
+ /*
+ * move this into a separate function to expand on
+ * extra switching rules
+ */
+ switch(inbuf->kh_type) {
+ case KNET_HEADER_TYPE_DATA:
+ if (knet_h->dst_host_filter_fn) {
+ bcast = knet_h->dst_host_filter_fn(
+ knet_h->dst_host_filter_fn_private_data,
+ (const unsigned char *)inbuf->khp_data_userdata,
+ inlen,
+ KNET_NOTIFY_TX,
+ knet_h->host_id,
+ knet_h->host_id,
+ &channel,
+ dst_host_ids_temp,
+ &dst_host_ids_entries_temp);
+ if (bcast < 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
+ savederrno = EFAULT;
+ err = -1;
+ goto out_unlock;
+ }
+
+ if ((!bcast) && (!dst_host_ids_entries_temp)) {
+ log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
+ savederrno = EINVAL;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+ break;
+ case KNET_HEADER_TYPE_HOST_INFO:
+ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
+ if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
+ bcast = 0;
+ dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
+ dst_host_ids_entries_temp = 1;
+ knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
+ }
+ break;
+ default:
+ log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
+ savederrno = ENOMSG;
+ err = -1;
+ goto out_unlock;
+ break;
+ }
+
+ if (is_sync) {
+ if ((bcast) ||
+ ((!bcast) && (dst_host_ids_entries_temp > 1))) {
+ log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
+ savederrno = E2BIG;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+
+ /*
+ * check destinations hosts before spending time
+ * in fragmenting/encrypting packets to save
+ * time processing data for unrechable hosts.
+ * for unicast, also remap the destination data
+ * to skip unreachable hosts.
+ */
+
+ if (!bcast) {
+ dst_host_ids_entries = 0;
+ for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
+ dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
+ if (!dst_host) {
+ continue;
+ }
+ if (dst_host->status.reachable) {
+ dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
+ dst_host_ids_entries++;
+ }
+ }
+ if (!dst_host_ids_entries) {
+ savederrno = EHOSTDOWN;
+ err = -1;
+ goto out_unlock;
+ }
+ } else {
+ send_mcast = 0;
+ for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
+ if (dst_host->status.reachable) {
+ send_mcast = 1;
+ break;
+ }
+ }
+ if (!send_mcast) {
+ savederrno = EHOSTDOWN;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+
+ if (!knet_h->data_mtu) {
+ /*
+ * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
+ */
+ log_debug(knet_h, KNET_SUB_TX,
+ "Received data packet but data MTU is still unknown."
+ " Packet might not be delivered."
+ " Assuming mininum IPv4 mtu (%d)",
+ KNET_PMTUD_MIN_MTU_V4);
+ temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
+ } else {
+ /*
+ * take a copy of the mtu to avoid value changing under
+ * our feet while we are sending a fragmented pckt
+ */
+ temp_data_mtu = knet_h->data_mtu;
+ }
+
+ /*
+ * prepare the outgoing buffers
+ */
+
+ frag_len = inlen;
+ frag_idx = 0;
+
+ inbuf->khp_data_bcast = bcast;
+ inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
+ inbuf->khp_data_channel = channel;
+
+ if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
+ goto out_unlock;
+ }
+ knet_h->tx_seq_num++;
+ /*
+ * force seq_num 0 to detect a node that has crashed and rejoining
+ * the knet instance. seq_num 0 will clear the buffers in the RX
+ * thread
+ */
+ if (knet_h->tx_seq_num == 0) {
+ knet_h->tx_seq_num++;
+ }
+ /*
+ * cache the value in locked context
+ */
+ tx_seq_num = knet_h->tx_seq_num;
+ knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(knet_h->tx_seq_num);
+ pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
+
+ /*
+ * forcefully broadcast a ping to all nodes every SEQ_MAX / 8
+ * pckts.
+ * this solves 2 problems:
+ * 1) on TX socket overloads we generate extra pings to keep links alive
+ * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
+ * node 3+ will be able to keep in sync on the TX seq_num even without
+ * receiving traffic or pings in betweens. This avoids issues with
+ * rollover of the circular buffer
+ */
+
+ if (tx_seq_num % (SEQ_MAX / 8) == 0) {
+ _send_pings(knet_h, 0);
+ }
+
+ while (frag_idx < inbuf->khp_data_frag_num) {
+ /*
+ * set the iov_base
+ */
+ iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
+
+ /*
+ * set the len
+ */
+ if (frag_len > temp_data_mtu) {
+ iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE;
+ } else {
+ iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
+ }
+
+ /*
+ * copy the frag info on all buffers
+ */
+ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
+
+ knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num;
+ knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
+ knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
+ knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
+
+ memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata,
+ inbuf->khp_data_userdata + (temp_data_mtu * frag_idx),
+ iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE);
+
+ frag_len = frag_len - temp_data_mtu;
+ frag_idx++;
+ }
+
+ if (knet_h->crypto_instance) {
+ frag_idx = 0;
+ while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) {
+ if (crypto_encrypt_and_sign(
+ knet_h,
+ (const unsigned char *)knet_h->send_to_links_buf[frag_idx],
+ iov_out[frag_idx].iov_len,
+ knet_h->send_to_links_buf_crypt[frag_idx],
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt unicast packet");
+ savederrno = ECHILD;
+ err = -1;
+ goto out_unlock;
+ }
+ iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
+ iov_out[frag_idx].iov_len = outlen;
+ frag_idx++;
+ }
+ }
+
+ if (!bcast) {
+ for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
+ dst_host = knet_h->host_index[dst_host_ids[host_idx]];
+
+ err = _dispatch_to_links(knet_h, dst_host, iov_out);
+ savederrno = errno;
+ if (err) {
+ goto out_unlock;
+ }
+ }
+ } else {
+ for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
+ if (dst_host->status.reachable) {
+ err = _dispatch_to_links(knet_h, dst_host, iov_out);
+ savederrno = errno;
+ if (err) {
+ goto out_unlock;
+ }
+ }
+ }
+ }
+
+out_unlock:
+ errno = savederrno;
+ return err;
+}
+
+int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
+{
+ int savederrno = 0, err = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len <= 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len > KNET_MAX_PACKET_SIZE) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel >= KNET_DATAFD_MAX) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ if (!knet_h->sockfd[channel].in_use) {
+ savederrno = EINVAL;
+ err = -1;
+ goto out;
+ }
+
+ savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
+ strerror(savederrno));
+ err = -1;
+ goto out;
+ }
+
+ knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA;
+ memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len);
+ err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1);
+ savederrno = errno;
+
+ pthread_mutex_unlock(&knet_h->tx_mutex);
+
+out:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ errno = savederrno;
+ return err;
+}
+
+static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct mmsghdr *msg, int type)
+{
+ ssize_t inlen = 0;
+ struct iovec iov_in;
+ int msg_recv, i;
+ int savederrno = 0, docallback = 0;
+
+ if ((channel >= 0) &&
+ (channel < KNET_DATAFD_MAX) &&
+ (!knet_h->sockfd[channel].is_socket)) {
+ memset(&iov_in, 0, sizeof(iov_in));
+ iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata;
+ iov_in.iov_len = KNET_MAX_PACKET_SIZE;
+
+ inlen = readv(sockfd, &iov_in, 1);
+
+ if (inlen <= 0) {
+ savederrno = errno;
+ docallback = 1;
+ goto out;
+ }
+
+ msg_recv = 1;
+ knet_h->recv_from_sock_buf[0]->kh_type = type;
+ _parse_recv_from_sock(knet_h, 0, inlen, channel, 0);
+ } else {
+ msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
+ if (msg_recv < 0) {
+ inlen = msg_recv;
+ savederrno = errno;
+ docallback = 1;
+ goto out;
+ }
+ for (i = 0; i < msg_recv; i++) {
+ inlen = msg[i].msg_len;
+ if (inlen == 0) {
+ savederrno = 0;
+ docallback = 1;
+ goto out;
+ break;
+ }
+ knet_h->recv_from_sock_buf[i]->kh_type = type;
+ _parse_recv_from_sock(knet_h, i, inlen, channel, 0);
+ }
+ }
+
+out:
+ if (inlen < 0) {
+ struct epoll_event ev;
+
+ memset(&ev, 0, sizeof(struct epoll_event));
+
+ if (epoll_ctl(knet_h->send_to_links_epollfd,
+ EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
+ knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
+ } else {
+ knet_h->sockfd[channel].has_error = 1;
+ }
+
+ }
+
+ if (docallback) {
+ knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
+ knet_h->sockfd[channel].sockfd[0],
+ channel,
+ KNET_NOTIFY_TX,
+ inlen,
+ savederrno);
+ }
+}
+
+void *_handle_send_to_links_thread(void *data)
+{
+ knet_handle_t knet_h = (knet_handle_t) data;
+ struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
+ struct sockaddr_storage address[PCKT_FRAG_MAX];
+ struct mmsghdr msg[PCKT_FRAG_MAX];
+ struct iovec iov_in[PCKT_FRAG_MAX];
+ int i, nev, type;
+ int8_t channel;
+
+ memset(&msg, 0, sizeof(struct mmsghdr));
+
+ /* preparing data buffer */
+ for (i = 0; i < PCKT_FRAG_MAX; i++) {
+ iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata;
+ iov_in[i].iov_len = KNET_MAX_PACKET_SIZE;
+
+ memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
+
+ msg[i].msg_hdr.msg_name = &address[i];
+ msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ msg[i].msg_hdr.msg_iov = &iov_in[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+
+ knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION;
+ knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0;
+ knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id);
+
+ knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
+ knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
+ knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
+ }
+
+ while (!shutdown_in_progress(knet_h)) {
+ nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1);
+
+ if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
+ continue;
+ }
+
+ for (i = 0; i < nev; i++) {
+ if (events[i].data.fd == knet_h->hostsockfd[0]) {
+ type = KNET_HEADER_TYPE_HOST_INFO;
+ channel = -1;
+ } else {
+ type = KNET_HEADER_TYPE_DATA;
+ for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
+ if ((knet_h->sockfd[channel].in_use) &&
+ (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
+ break;
+ }
+ }
+ }
+ if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
+ continue;
+ }
+ _handle_send_to_links(knet_h, events[i].data.fd, channel, msg, type);
+ pthread_mutex_unlock(&knet_h->tx_mutex);
+ }
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ }
+
+ return NULL;
+}
+
+/*
+ * RECV
+ */
+
+/*
+ * return 1 if a > b
+ * return -1 if b > a
+ * return 0 if they are equal
+ */
+static inline int timecmp(struct timespec a, struct timespec b)
+{
+ if (a.tv_sec != b.tv_sec) {
+ if (a.tv_sec > b.tv_sec) {
+ return 1;
+ } else {
+ return -1;
+ }
+ } else {
+ if (a.tv_nsec > b.tv_nsec) {
+ return 1;
+ } else if (a.tv_nsec < b.tv_nsec) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+}
+
+/*
+ * this functions needs to return an index (0 to 7)
+ * to a knet_host_defrag_buf. (-1 on errors)
+ */
+
+static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
+{
+ struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
+ int i, oldest;
+
+ /*
+ * check if there is a buffer already in use handling the same seq_num
+ */
+ for (i = 0; i < KNET_MAX_LINK; i++) {
+ if (src_host->defrag_buf[i].in_use) {
+ if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
+ return i;
+ }
+ }
+ }
+
+ /*
+ * If there is no buffer that's handling the current seq_num
+ * either it's new or it's been reclaimed already.
+ * check if it's been reclaimed/seen before using the defrag circular
+ * buffer. If the pckt has been seen before, the buffer expired (ETIME)
+ * and there is no point to try to defrag it again.
+ */
+ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
+ errno = ETIME;
+ return -1;
+ }
+
+ /*
+ * register the pckt as seen
+ */
+ _seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
+
+ /*
+ * see if there is a free buffer
+ */
+ for (i = 0; i < KNET_MAX_LINK; i++) {
+ if (!src_host->defrag_buf[i].in_use) {
+ return i;
+ }
+ }
+
+ /*
+ * at this point, there are no free buffers, the pckt is new
+ * and we need to reclaim a buffer, and we will take the one
+ * with the oldest timestamp. It's as good as any.
+ */
+
+ oldest = 0;
+
+ for (i = 0; i < KNET_MAX_LINK; i++) {
+ if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
+ oldest = i;
+ }
+ }
+ src_host->defrag_buf[oldest].in_use = 0;
+ return oldest;
+}
+
+static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
+{
+ struct knet_host_defrag_buf *defrag_buf;
+ int defrag_buf_idx;
+
+ defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
+ if (defrag_buf_idx < 0) {
+ if (errno == ETIME) {
+ log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired");
+ }
+ return 1;
+ }
+
+ defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
+
+ /*
+ * if the buf is not is use, then make sure it's clean
+ */
+ if (!defrag_buf->in_use) {
+ memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
+ defrag_buf->in_use = 1;
+ defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
+ }
+
+ /*
+ * update timestamp on the buffer
+ */
+ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
+
+ /*
+ * check if we already received this fragment
+ */
+ if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
+ /*
+ * if we have received this fragment and we didn't clear the buffer
+ * it means that we don't have all fragments yet
+ */
+ return 1;
+ }
+
+ /*
+ * we need to handle the last packet with gloves due to its different size
+ */
+
+ if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
+ defrag_buf->last_frag_size = *len;
+
+ /*
+ * in the event when the last packet arrives first,
+ * we still don't know the offset vs the other fragments (based on MTU),
+ * so we store the fragment at the end of the buffer where it's safe
+ * and take a copy of the len so that we can restore its offset later.
+ * remember we can't use the local MTU for this calculation because pMTU
+ * can be asymettric between the same hosts.
+ */
+ if (!defrag_buf->frag_size) {
+ defrag_buf->last_first = 1;
+ memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
+ inbuf->khp_data_userdata,
+ *len);
+ }
+ } else {
+ defrag_buf->frag_size = *len;
+ }
+
+ memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
+ inbuf->khp_data_userdata, *len);
+
+ defrag_buf->frag_recv++;
+ defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
+
+ /*
+ * check if we received all the fragments
+ */
+ if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
+ /*
+ * special case the last pckt
+ */
+
+ if (defrag_buf->last_first) {
+ memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
+ defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
+ defrag_buf->last_frag_size);
+ }
+
+ /*
+ * recalculate packet lenght
+ */
+
+ *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
+
+ /*
+ * copy the pckt back in the user data
+ */
+ memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
+
+ /*
+ * free this buffer
+ */
+ defrag_buf->in_use = 0;
+ return 0;
+ }
+
+ return 1;
+}
+
+static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct mmsghdr *msg)
+{
+ int err = 0, savederrno = 0;
+ ssize_t outlen;
+ struct knet_host *src_host;
+ struct knet_link *src_link;
+ unsigned long long latency_last;
+ uint16_t dst_host_ids[KNET_MAX_HOST];
+ size_t dst_host_ids_entries = 0;
+ int bcast = 1;
+ struct timespec recvtime;
+ struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
+ unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
+ ssize_t len = msg->msg_len;
+ struct knet_hostinfo *knet_hostinfo;
+ struct iovec iov_out[1];
+ int8_t channel;
+ struct sockaddr_storage pckt_src;
+ seq_num_t recv_seq_num;
+ int wipe_bufs = 0;
+
+ if (knet_h->crypto_instance) {
+ if (crypto_authenticate_and_decrypt(knet_h,
+ (unsigned char *)inbuf,
+ len,
+ knet_h->recv_from_links_buf_decrypt,
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
+ return;
+ }
+ len = outlen;
+ inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
+ }
+
+ if (len < (KNET_HEADER_SIZE + 1)) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", len);
+ return;
+ }
+
+ if (inbuf->kh_version != KNET_HEADER_VERSION) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
+ return;
+ }
+
+ inbuf->kh_node = ntohs(inbuf->kh_node);
+ src_host = knet_h->host_index[inbuf->kh_node];
+ if (src_host == NULL) { /* host not found */
+ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
+ return;
+ }
+
+ src_link = NULL;
+
+ if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
+ src_link = src_host->link +
+ (inbuf->khp_ping_link % KNET_MAX_LINK);
+ if (src_link->dynamic == KNET_LINK_DYNIP) {
+ /*
+ * cpyaddrport will only copy address and port of the incoming
+ * packet and strip extra bits such as flow and scopeid
+ */
+ cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
+
+ if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
+ &pckt_src, sockaddr_len(&pckt_src)) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
+ src_host->host_id, src_link->link_id);
+ memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
+ if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
+ src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
+ src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
+ snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
+ snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
+ } else {
+ log_info(knet_h, KNET_SUB_RX,
+ "host: %u link: %u new connection established from: %s %s",
+ src_host->host_id, src_link->link_id,
+ src_link->status.dst_ipaddr, src_link->status.dst_port);
+ }
+ }
+ /*
+ * transport has already accepted the connection here
+ * otherwise we would not be receiving packets
+ */
+ knet_h->transport_ops[src_link->transport_type]->transport_link_dyn_connect(knet_h, sockfd, src_link);
+ }
+ }
+
+ switch (inbuf->kh_type) {
+ case KNET_HEADER_TYPE_HOST_INFO:
+ case KNET_HEADER_TYPE_DATA:
+ /*
+ * TODO: should we accept data even if we can't reply to the other node?
+ * how would that work with SCTP and guaranteed delivery?
+ */
+
+ if (!src_host->status.reachable) {
+ log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id);
+ //return;
+ }
+ inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
+ channel = inbuf->khp_data_channel;
+ src_host->got_data = 1;
+
+ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
+ if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
+ }
+ return;
+ }
+
+ if (inbuf->khp_data_frag_num > 1) {
+ /*
+ * len as received from the socket also includes extra stuff
+ * that the defrag code doesn't care about. So strip it
+ * here and readd only for repadding once we are done
+ * defragging
+ */
+ len = len - KNET_HEADER_DATA_SIZE;
+ if (pckt_defrag(knet_h, inbuf, &len)) {
+ return;
+ }
+ len = len + KNET_HEADER_DATA_SIZE;
+ }
+
+ if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
+ if (knet_h->enabled != 1) /* data forward is disabled */
+ break;
+
+ if (knet_h->dst_host_filter_fn) {
+ int host_idx;
+ int found = 0;
+
+ bcast = knet_h->dst_host_filter_fn(
+ knet_h->dst_host_filter_fn_private_data,
+ (const unsigned char *)inbuf->khp_data_userdata,
+ len - KNET_HEADER_DATA_SIZE,
+ KNET_NOTIFY_RX,
+ knet_h->host_id,
+ inbuf->kh_node,
+ &channel,
+ dst_host_ids,
+ &dst_host_ids_entries);
+ if (bcast < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
+ return;
+ }
+
+ if ((!bcast) && (!dst_host_ids_entries)) {
+ log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
+ return;
+ }
+
+ /* check if we are dst for this packet */
+ if (!bcast) {
+ for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
+ if (dst_host_ids[host_idx] == knet_h->host_id) {
+ found = 1;
+ break;
+ }
+ }
+ if (!found) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
+ return;
+ }
+ }
+ }
+ }
+
+ if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
+ if (!knet_h->sockfd[channel].in_use) {
+ log_debug(knet_h, KNET_SUB_RX,
+ "received packet for channel %d but there is no local sock connected",
+ channel);
+ return;
+ }
+
+ memset(iov_out, 0, sizeof(iov_out));
+ iov_out[0].iov_base = (void *) inbuf->khp_data_userdata;
+ iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE;
+
+ outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
+ if (outlen <= 0) {
+ knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
+ knet_h->sockfd[channel].sockfd[0],
+ channel,
+ KNET_NOTIFY_RX,
+ outlen,
+ errno);
+ return;
+ }
+ if (outlen == iov_out[0].iov_len) {
+ _seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
+ }
+ } else { /* HOSTINFO */
+ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
+ if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
+ bcast = 0;
+ knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
+ }
+ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
+ return;
+ }
+ _seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
+ switch(knet_hostinfo->khi_type) {
+ case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
+ break;
+ case KNET_HOSTINFO_TYPE_LINK_TABLE:
+ break;
+ default:
+ log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
+ break;
+ }
+ }
+ break;
+ case KNET_HEADER_TYPE_PING:
+ outlen = KNET_HEADER_PING_SIZE;
+ inbuf->kh_type = KNET_HEADER_TYPE_PONG;
+ inbuf->kh_node = htons(knet_h->host_id);
+ recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
+
+ wipe_bufs = 0;
+
+ if (!inbuf->khp_ping_timed) {
+ /*
+ * we might be receiving this message from all links, but we want
+ * to process it only the first time
+ */
+ if (recv_seq_num != src_host->untimed_rx_seq_num) {
+ /*
+ * cache the untimed seq num
+ */
+ src_host->untimed_rx_seq_num = recv_seq_num;
+ /*
+ * if the host has received data in between
+ * untimed ping, then we don't need to wipe the bufs
+ */
+ if (src_host->got_data) {
+ src_host->got_data = 0;
+ wipe_bufs = 0;
+ } else {
+ wipe_bufs = 1;
+ }
+ }
+ _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
+ } else {
+ /*
+ * pings always arrives in bursts over all the link
+ * catch the first of them to cache the seq num and
+ * avoid duplicate processing
+ */
+ if (recv_seq_num != src_host->timed_rx_seq_num) {
+ src_host->timed_rx_seq_num = recv_seq_num;
+
+ if (recv_seq_num == 0) {
+ _seq_num_lookup(src_host, recv_seq_num, 0, 1);
+ }
+ }
+ }
+
+ if (knet_h->crypto_instance) {
+ if (crypto_encrypt_and_sign(knet_h,
+ (const unsigned char *)inbuf,
+ len,
+ knet_h->recv_from_links_buf_crypt,
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
+ break;
+ }
+ outbuf = knet_h->recv_from_links_buf_crypt;
+ }
+
+retry_pong:
+ len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
+ (struct sockaddr *) &src_link->dst_addr,
+ sizeof(struct sockaddr_storage));
+ savederrno = errno;
+ if (len != outlen) {
+ err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
+ switch(err) {
+ case -1: /* unrecoverable error */
+ log_debug(knet_h, KNET_SUB_RX,
+ "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
+ src_link->outsock, errno, strerror(errno),
+ src_link->status.src_ipaddr, src_link->status.src_port,
+ src_link->status.dst_ipaddr, src_link->status.dst_port);
+ break;
+ case 0: /* ignore error and continue */
+ break;
+ case 1: /* retry to send those same data */
+ goto retry_pong;
+ break;
+ }
+ }
+ break;
+ case KNET_HEADER_TYPE_PONG:
+ clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
+
+ memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
+ timespec_diff(recvtime,
+ src_link->status.pong_last, &latency_last);
+
+ src_link->status.latency =
+ ((src_link->status.latency * src_link->latency_exp) +
+ ((latency_last / 1000llu) *
+ (src_link->latency_fix - src_link->latency_exp))) /
+ src_link->latency_fix;
+
+ if (src_link->status.latency < src_link->pong_timeout) {
+ if (!src_link->status.connected) {
+ if (src_link->received_pong >= src_link->pong_count) {
+ log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
+ src_host->host_id, src_link->link_id);
+ _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
+ } else {
+ src_link->received_pong++;
+ log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
+ src_host->host_id, src_link->link_id, src_link->received_pong);
+ }
+ }
+ }
+
+ break;
+ case KNET_HEADER_TYPE_PMTUD:
+ outlen = KNET_HEADER_PMTUD_SIZE;
+ inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
+ inbuf->kh_node = htons(knet_h->host_id);
+
+ if (knet_h->crypto_instance) {
+ if (crypto_encrypt_and_sign(knet_h,
+ (const unsigned char *)inbuf,
+ len,
+ knet_h->recv_from_links_buf_crypt,
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
+ break;
+ }
+ outbuf = knet_h->recv_from_links_buf_crypt;
+ }
+
+retry_pmtud:
+ len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
+ (struct sockaddr *) &src_link->dst_addr,
+ sizeof(struct sockaddr_storage));
+ if (len != outlen) {
+ err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
+ switch(err) {
+ case -1: /* unrecoverable error */
+ log_debug(knet_h, KNET_SUB_RX,
+ "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
+ src_link->outsock, errno, strerror(errno),
+ src_link->status.src_ipaddr, src_link->status.src_port,
+ src_link->status.dst_ipaddr, src_link->status.dst_port);
+ break;
+ case 0: /* ignore error and continue */
+ break;
+ case 1: /* retry to send those same data */
+ goto retry_pmtud;
+ break;
+ }
+ }
+
+ break;
+ case KNET_HEADER_TYPE_PMTUD_REPLY:
+ if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
+ break;
+ }
+ src_link->last_recv_mtu = inbuf->khp_pmtud_size;
+ pthread_cond_signal(&knet_h->pmtud_cond);
+ pthread_mutex_unlock(&knet_h->pmtud_mutex);
+ break;
+ default:
+ return;
+ }
+}
+
+static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg)
+{
+ int err, savederrno;
+ int i, msg_recv, transport;
+
+ if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
+ return;
+ }
+
+ if (_is_valid_fd(knet_h, sockfd) < 1) {
+ /*
+ * this is normal if a fd got an event and before we grab the read lock
+ * and the link is removed by another thread
+ */
+ goto exit_unlock;
+ }
+
+ transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
+
+ /*
+ * reset msg_namelen to buffer size because after recvmmsg
+ * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
+ */
+
+ for (i = 0; i < PCKT_FRAG_MAX; i++) {
+ msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ }
+
+ msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
+ savederrno = errno;
+
+ /*
+ * WARNING: man page for recvmmsg is wrong. Kernel implementation here:
+ * recvmmsg can return:
+ * -1 on error
+ * 0 if the previous run of recvmmsg recorded an error on the socket
+ * N number of messages (see exception below).
+ *
+ * If there is an error from recvmsg after receiving a frame or more, the recvmmsg
+ * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
+ * it will be visibile in the next run.
+ *
+ * Need to be careful how we handle errors at this stage.
+ *
+ * error messages need to be handled on a per transport/protocol base
+ * at this point we have different layers of error handling
+ * - msg_recv < 0 -> error from this run
+ * msg_recv = 0 -> error from previous run and error on socket needs to be cleared
+ * - per-transport message data
+ * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
+ * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
+ * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
+ * errno set. That means the error api needs to be able to abort the loop below.
+ */
+
+ if (msg_recv <= 0) {
+ knet_h->transport_ops[transport]->transport_rx_sock_error(knet_h, sockfd, msg_recv, savederrno);
+ goto exit_unlock;
+ }
+
+ for (i = 0; i < msg_recv; i++) {
+ err = knet_h->transport_ops[transport]->transport_rx_is_data(knet_h, sockfd, &msg[i]);
+
+ /*
+ * TODO: make this section silent once we are confident
+ * all protocols packet handlers are good
+ */
+
+ switch(err) {
+ case -1: /* on error */
+ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
+ goto exit_unlock;
+ break;
+ case 0: /* packet is not data and we should continue the packet process loop */
+ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
+ break;
+ case 1: /* packet is not data and we should STOP the packet process loop */
+ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
+ goto exit_unlock;
+ break;
+ case 2: /* packet is data and should be parsed as such */
+ _parse_recv_from_links(knet_h, sockfd, &msg[i]);
+ break;
+ }
+ }
+
+exit_unlock:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+}
+
+void *_handle_recv_from_links_thread(void *data)
+{
+ int i, nev;
+ knet_handle_t knet_h = (knet_handle_t) data;
+ struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
+ struct sockaddr_storage address[PCKT_FRAG_MAX];
+ struct mmsghdr msg[PCKT_FRAG_MAX];
+ struct iovec iov_in[PCKT_FRAG_MAX];
+
+ memset(&msg, 0, sizeof(msg));
+
+ for (i = 0; i < PCKT_FRAG_MAX; i++) {
+ iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
+ iov_in[i].iov_len = KNET_DATABUFSIZE;
+
+ memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
+
+ msg[i].msg_hdr.msg_name = &address[i];
+ msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ msg[i].msg_hdr.msg_iov = &iov_in[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+ }
+
+ while (!shutdown_in_progress(knet_h)) {
+ nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
+
+ for (i = 0; i < nev; i++) {
+ _handle_recv_from_links(knet_h, events[i].data.fd, msg);
+ }
+ }
+
+ return NULL;
+}
diff --git a/libknet/threads_tx.h b/libknet/threads_tx.h
new file mode 100644
index 00000000..fec6e5f1
--- /dev/null
+++ b/libknet/threads_tx.h
@@ -0,0 +1,16 @@
+/*
+ * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ * Federico Simoncelli <fsimon@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#ifndef __THREADS_SEND_RECV_H__
+#define __THREADS_SEND_RECV_H__
+
+void *_handle_send_to_links_thread(void *data);
+void *_handle_recv_from_links_thread(void *data);
+
+#endif

File Metadata

Mime Type
text/x-diff
Expires
Wed, Jun 25, 7:07 AM (11 h, 29 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1952496
Default Alt Text
(76 KB)

Event Timeline