Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F4512779
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
76 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
new file mode 100644
index 00000000..37375346
--- /dev/null
+++ b/libknet/threads_rx.c
@@ -0,0 +1,1302 @@
+/*
+ * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ * Federico Simoncelli <fsimon@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#include "config.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <pthread.h>
+#include <math.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <sys/uio.h>
+
+#include "crypto.h"
+#include "compat.h"
+#include "host.h"
+#include "link.h"
+#include "logging.h"
+#include "transports.h"
+#include "threads_common.h"
+#include "threads_heartbeat.h"
+#include "threads_send_recv.h"
+#include "netutils.h"
+
+/*
+ * SEND
+ */
+
+static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct iovec *iov_out)
+{
+ int link_idx, msg_idx, sent_msgs, msgs_to_send, prev_sent, progress;
+ struct mmsghdr msg[PCKT_FRAG_MAX];
+ int err = 0, savederrno = 0;
+
+ memset(&msg, 0, sizeof(struct mmsghdr));
+
+ for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
+
+ msgs_to_send = knet_h->send_to_links_buf[0]->khp_data_frag_num;
+ sent_msgs = 0;
+ prev_sent = 0;
+ progress = 1;
+
+retry:
+ msg_idx = 0;
+
+ while (msg_idx < msgs_to_send) {
+ memset(&msg[msg_idx].msg_hdr, 0, sizeof(struct msghdr));
+ msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr;
+ msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx + prev_sent];
+ msg[msg_idx].msg_hdr.msg_iovlen = 1;
+ msg_idx++;
+ }
+
+ sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
+ msg, msg_idx, MSG_DONTWAIT | MSG_NOSIGNAL);
+ savederrno = errno;
+
+ err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
+ switch(err) {
+ case -1: /* unrecoverable error */
+ goto out_unlock;
+ break;
+ case 0: /* ignore error and continue */
+ break;
+ case 1: /* retry to send those same data */
+ goto retry;
+ break;
+ }
+
+ if ((sent_msgs >= 0) && (sent_msgs < msg_idx)) {
+ if ((sent_msgs) || (progress)) {
+ msgs_to_send = msg_idx - sent_msgs;
+ prev_sent = prev_sent + sent_msgs;
+ if (sent_msgs) {
+ progress = 1;
+ } else {
+ progress = 0;
+ }
+ log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
+ sent_msgs, msg_idx,
+ dst_host->name, dst_host->host_id,
+ dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
+ dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
+ dst_host->link[dst_host->active_links[link_idx]].link_id);
+ goto retry;
+ }
+ if (!progress) {
+ savederrno = EAGAIN;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+
+ if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
+ (dst_host->active_link_entries > 1)) {
+ uint8_t cur_link_id = dst_host->active_links[0];
+
+ memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
+ dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
+
+ break;
+ }
+ }
+
+out_unlock:
+ errno = savederrno;
+ return err;
+}
+
+static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync)
+{
+ ssize_t outlen, frag_len;
+ struct knet_host *dst_host;
+ uint16_t dst_host_ids_temp[KNET_MAX_HOST];
+ size_t dst_host_ids_entries_temp = 0;
+ uint16_t dst_host_ids[KNET_MAX_HOST];
+ size_t dst_host_ids_entries = 0;
+ int bcast = 1;
+ struct knet_hostinfo *knet_hostinfo;
+ struct iovec iov_out[PCKT_FRAG_MAX];
+ uint8_t frag_idx;
+ unsigned int temp_data_mtu;
+ int host_idx;
+ int send_mcast = 0;
+ struct knet_header *inbuf;
+ int savederrno = 0;
+ int err = 0;
+ seq_num_t tx_seq_num;
+
+ inbuf = knet_h->recv_from_sock_buf[buf_idx];
+
+ if ((knet_h->enabled != 1) &&
+ (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
+ log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
+ savederrno = ECANCELED;
+ err = -1;
+ goto out_unlock;
+ }
+
+ /*
+ * move this into a separate function to expand on
+ * extra switching rules
+ */
+ switch(inbuf->kh_type) {
+ case KNET_HEADER_TYPE_DATA:
+ if (knet_h->dst_host_filter_fn) {
+ bcast = knet_h->dst_host_filter_fn(
+ knet_h->dst_host_filter_fn_private_data,
+ (const unsigned char *)inbuf->khp_data_userdata,
+ inlen,
+ KNET_NOTIFY_TX,
+ knet_h->host_id,
+ knet_h->host_id,
+ &channel,
+ dst_host_ids_temp,
+ &dst_host_ids_entries_temp);
+ if (bcast < 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
+ savederrno = EFAULT;
+ err = -1;
+ goto out_unlock;
+ }
+
+ if ((!bcast) && (!dst_host_ids_entries_temp)) {
+ log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
+ savederrno = EINVAL;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+ break;
+ case KNET_HEADER_TYPE_HOST_INFO:
+ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
+ if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
+ bcast = 0;
+ dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
+ dst_host_ids_entries_temp = 1;
+ knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
+ }
+ break;
+ default:
+ log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
+ savederrno = ENOMSG;
+ err = -1;
+ goto out_unlock;
+ break;
+ }
+
+ if (is_sync) {
+ if ((bcast) ||
+ ((!bcast) && (dst_host_ids_entries_temp > 1))) {
+ log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
+ savederrno = E2BIG;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+
+ /*
+ * check destinations hosts before spending time
+ * in fragmenting/encrypting packets to save
+ * time processing data for unrechable hosts.
+ * for unicast, also remap the destination data
+ * to skip unreachable hosts.
+ */
+
+ if (!bcast) {
+ dst_host_ids_entries = 0;
+ for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
+ dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
+ if (!dst_host) {
+ continue;
+ }
+ if (dst_host->status.reachable) {
+ dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
+ dst_host_ids_entries++;
+ }
+ }
+ if (!dst_host_ids_entries) {
+ savederrno = EHOSTDOWN;
+ err = -1;
+ goto out_unlock;
+ }
+ } else {
+ send_mcast = 0;
+ for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
+ if (dst_host->status.reachable) {
+ send_mcast = 1;
+ break;
+ }
+ }
+ if (!send_mcast) {
+ savederrno = EHOSTDOWN;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+
+ if (!knet_h->data_mtu) {
+ /*
+ * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
+ */
+ log_debug(knet_h, KNET_SUB_TX,
+ "Received data packet but data MTU is still unknown."
+ " Packet might not be delivered."
+ " Assuming mininum IPv4 mtu (%d)",
+ KNET_PMTUD_MIN_MTU_V4);
+ temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
+ } else {
+ /*
+ * take a copy of the mtu to avoid value changing under
+ * our feet while we are sending a fragmented pckt
+ */
+ temp_data_mtu = knet_h->data_mtu;
+ }
+
+ /*
+ * prepare the outgoing buffers
+ */
+
+ frag_len = inlen;
+ frag_idx = 0;
+
+ inbuf->khp_data_bcast = bcast;
+ inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
+ inbuf->khp_data_channel = channel;
+
+ if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
+ goto out_unlock;
+ }
+ knet_h->tx_seq_num++;
+ /*
+ * force seq_num 0 to detect a node that has crashed and rejoining
+ * the knet instance. seq_num 0 will clear the buffers in the RX
+ * thread
+ */
+ if (knet_h->tx_seq_num == 0) {
+ knet_h->tx_seq_num++;
+ }
+ /*
+ * cache the value in locked context
+ */
+ tx_seq_num = knet_h->tx_seq_num;
+ knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(knet_h->tx_seq_num);
+ pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
+
+ /*
+ * forcefully broadcast a ping to all nodes every SEQ_MAX / 8
+ * pckts.
+ * this solves 2 problems:
+ * 1) on TX socket overloads we generate extra pings to keep links alive
+ * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
+ * node 3+ will be able to keep in sync on the TX seq_num even without
+ * receiving traffic or pings in betweens. This avoids issues with
+ * rollover of the circular buffer
+ */
+
+ if (tx_seq_num % (SEQ_MAX / 8) == 0) {
+ _send_pings(knet_h, 0);
+ }
+
+ while (frag_idx < inbuf->khp_data_frag_num) {
+ /*
+ * set the iov_base
+ */
+ iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
+
+ /*
+ * set the len
+ */
+ if (frag_len > temp_data_mtu) {
+ iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE;
+ } else {
+ iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
+ }
+
+ /*
+ * copy the frag info on all buffers
+ */
+ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
+
+ knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num;
+ knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
+ knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
+ knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
+
+ memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata,
+ inbuf->khp_data_userdata + (temp_data_mtu * frag_idx),
+ iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE);
+
+ frag_len = frag_len - temp_data_mtu;
+ frag_idx++;
+ }
+
+ if (knet_h->crypto_instance) {
+ frag_idx = 0;
+ while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) {
+ if (crypto_encrypt_and_sign(
+ knet_h,
+ (const unsigned char *)knet_h->send_to_links_buf[frag_idx],
+ iov_out[frag_idx].iov_len,
+ knet_h->send_to_links_buf_crypt[frag_idx],
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt unicast packet");
+ savederrno = ECHILD;
+ err = -1;
+ goto out_unlock;
+ }
+ iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
+ iov_out[frag_idx].iov_len = outlen;
+ frag_idx++;
+ }
+ }
+
+ if (!bcast) {
+ for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
+ dst_host = knet_h->host_index[dst_host_ids[host_idx]];
+
+ err = _dispatch_to_links(knet_h, dst_host, iov_out);
+ savederrno = errno;
+ if (err) {
+ goto out_unlock;
+ }
+ }
+ } else {
+ for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
+ if (dst_host->status.reachable) {
+ err = _dispatch_to_links(knet_h, dst_host, iov_out);
+ savederrno = errno;
+ if (err) {
+ goto out_unlock;
+ }
+ }
+ }
+ }
+
+out_unlock:
+ errno = savederrno;
+ return err;
+}
+
+int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
+{
+ int savederrno = 0, err = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len <= 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len > KNET_MAX_PACKET_SIZE) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel >= KNET_DATAFD_MAX) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ if (!knet_h->sockfd[channel].in_use) {
+ savederrno = EINVAL;
+ err = -1;
+ goto out;
+ }
+
+ savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
+ strerror(savederrno));
+ err = -1;
+ goto out;
+ }
+
+ knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA;
+ memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len);
+ err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1);
+ savederrno = errno;
+
+ pthread_mutex_unlock(&knet_h->tx_mutex);
+
+out:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ errno = savederrno;
+ return err;
+}
+
+static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct mmsghdr *msg, int type)
+{
+ ssize_t inlen = 0;
+ struct iovec iov_in;
+ int msg_recv, i;
+ int savederrno = 0, docallback = 0;
+
+ if ((channel >= 0) &&
+ (channel < KNET_DATAFD_MAX) &&
+ (!knet_h->sockfd[channel].is_socket)) {
+ memset(&iov_in, 0, sizeof(iov_in));
+ iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata;
+ iov_in.iov_len = KNET_MAX_PACKET_SIZE;
+
+ inlen = readv(sockfd, &iov_in, 1);
+
+ if (inlen <= 0) {
+ savederrno = errno;
+ docallback = 1;
+ goto out;
+ }
+
+ msg_recv = 1;
+ knet_h->recv_from_sock_buf[0]->kh_type = type;
+ _parse_recv_from_sock(knet_h, 0, inlen, channel, 0);
+ } else {
+ msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
+ if (msg_recv < 0) {
+ inlen = msg_recv;
+ savederrno = errno;
+ docallback = 1;
+ goto out;
+ }
+ for (i = 0; i < msg_recv; i++) {
+ inlen = msg[i].msg_len;
+ if (inlen == 0) {
+ savederrno = 0;
+ docallback = 1;
+ goto out;
+ break;
+ }
+ knet_h->recv_from_sock_buf[i]->kh_type = type;
+ _parse_recv_from_sock(knet_h, i, inlen, channel, 0);
+ }
+ }
+
+out:
+ if (inlen < 0) {
+ struct epoll_event ev;
+
+ memset(&ev, 0, sizeof(struct epoll_event));
+
+ if (epoll_ctl(knet_h->send_to_links_epollfd,
+ EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
+ knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
+ } else {
+ knet_h->sockfd[channel].has_error = 1;
+ }
+
+ }
+
+ if (docallback) {
+ knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
+ knet_h->sockfd[channel].sockfd[0],
+ channel,
+ KNET_NOTIFY_TX,
+ inlen,
+ savederrno);
+ }
+}
+
+void *_handle_send_to_links_thread(void *data)
+{
+ knet_handle_t knet_h = (knet_handle_t) data;
+ struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
+ struct sockaddr_storage address[PCKT_FRAG_MAX];
+ struct mmsghdr msg[PCKT_FRAG_MAX];
+ struct iovec iov_in[PCKT_FRAG_MAX];
+ int i, nev, type;
+ int8_t channel;
+
+ memset(&msg, 0, sizeof(struct mmsghdr));
+
+ /* preparing data buffer */
+ for (i = 0; i < PCKT_FRAG_MAX; i++) {
+ iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata;
+ iov_in[i].iov_len = KNET_MAX_PACKET_SIZE;
+
+ memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
+
+ msg[i].msg_hdr.msg_name = &address[i];
+ msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ msg[i].msg_hdr.msg_iov = &iov_in[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+
+ knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION;
+ knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0;
+ knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id);
+
+ knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
+ knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
+ knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
+ }
+
+ while (!shutdown_in_progress(knet_h)) {
+ nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1);
+
+ if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
+ continue;
+ }
+
+ for (i = 0; i < nev; i++) {
+ if (events[i].data.fd == knet_h->hostsockfd[0]) {
+ type = KNET_HEADER_TYPE_HOST_INFO;
+ channel = -1;
+ } else {
+ type = KNET_HEADER_TYPE_DATA;
+ for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
+ if ((knet_h->sockfd[channel].in_use) &&
+ (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
+ break;
+ }
+ }
+ }
+ if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
+ continue;
+ }
+ _handle_send_to_links(knet_h, events[i].data.fd, channel, msg, type);
+ pthread_mutex_unlock(&knet_h->tx_mutex);
+ }
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ }
+
+ return NULL;
+}
+
+/*
+ * RECV
+ */
+
+/*
+ * return 1 if a > b
+ * return -1 if b > a
+ * return 0 if they are equal
+ */
+static inline int timecmp(struct timespec a, struct timespec b)
+{
+ if (a.tv_sec != b.tv_sec) {
+ if (a.tv_sec > b.tv_sec) {
+ return 1;
+ } else {
+ return -1;
+ }
+ } else {
+ if (a.tv_nsec > b.tv_nsec) {
+ return 1;
+ } else if (a.tv_nsec < b.tv_nsec) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+}
+
+/*
+ * this functions needs to return an index (0 to 7)
+ * to a knet_host_defrag_buf. (-1 on errors)
+ */
+
+static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
+{
+ struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
+ int i, oldest;
+
+ /*
+ * check if there is a buffer already in use handling the same seq_num
+ */
+ for (i = 0; i < KNET_MAX_LINK; i++) {
+ if (src_host->defrag_buf[i].in_use) {
+ if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
+ return i;
+ }
+ }
+ }
+
+ /*
+ * If there is no buffer that's handling the current seq_num
+ * either it's new or it's been reclaimed already.
+ * check if it's been reclaimed/seen before using the defrag circular
+ * buffer. If the pckt has been seen before, the buffer expired (ETIME)
+ * and there is no point to try to defrag it again.
+ */
+ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
+ errno = ETIME;
+ return -1;
+ }
+
+ /*
+ * register the pckt as seen
+ */
+ _seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
+
+ /*
+ * see if there is a free buffer
+ */
+ for (i = 0; i < KNET_MAX_LINK; i++) {
+ if (!src_host->defrag_buf[i].in_use) {
+ return i;
+ }
+ }
+
+ /*
+ * at this point, there are no free buffers, the pckt is new
+ * and we need to reclaim a buffer, and we will take the one
+ * with the oldest timestamp. It's as good as any.
+ */
+
+ oldest = 0;
+
+ for (i = 0; i < KNET_MAX_LINK; i++) {
+ if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
+ oldest = i;
+ }
+ }
+ src_host->defrag_buf[oldest].in_use = 0;
+ return oldest;
+}
+
+static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
+{
+ struct knet_host_defrag_buf *defrag_buf;
+ int defrag_buf_idx;
+
+ defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
+ if (defrag_buf_idx < 0) {
+ if (errno == ETIME) {
+ log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired");
+ }
+ return 1;
+ }
+
+ defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
+
+ /*
+ * if the buf is not is use, then make sure it's clean
+ */
+ if (!defrag_buf->in_use) {
+ memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
+ defrag_buf->in_use = 1;
+ defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
+ }
+
+ /*
+ * update timestamp on the buffer
+ */
+ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
+
+ /*
+ * check if we already received this fragment
+ */
+ if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
+ /*
+ * if we have received this fragment and we didn't clear the buffer
+ * it means that we don't have all fragments yet
+ */
+ return 1;
+ }
+
+ /*
+ * we need to handle the last packet with gloves due to its different size
+ */
+
+ if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
+ defrag_buf->last_frag_size = *len;
+
+ /*
+ * in the event when the last packet arrives first,
+ * we still don't know the offset vs the other fragments (based on MTU),
+ * so we store the fragment at the end of the buffer where it's safe
+ * and take a copy of the len so that we can restore its offset later.
+ * remember we can't use the local MTU for this calculation because pMTU
+ * can be asymettric between the same hosts.
+ */
+ if (!defrag_buf->frag_size) {
+ defrag_buf->last_first = 1;
+ memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
+ inbuf->khp_data_userdata,
+ *len);
+ }
+ } else {
+ defrag_buf->frag_size = *len;
+ }
+
+ memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
+ inbuf->khp_data_userdata, *len);
+
+ defrag_buf->frag_recv++;
+ defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
+
+ /*
+ * check if we received all the fragments
+ */
+ if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
+ /*
+ * special case the last pckt
+ */
+
+ if (defrag_buf->last_first) {
+ memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
+ defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
+ defrag_buf->last_frag_size);
+ }
+
+ /*
+ * recalculate packet lenght
+ */
+
+ *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
+
+ /*
+ * copy the pckt back in the user data
+ */
+ memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
+
+ /*
+ * free this buffer
+ */
+ defrag_buf->in_use = 0;
+ return 0;
+ }
+
+ return 1;
+}
+
+static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct mmsghdr *msg)
+{
+ int err = 0, savederrno = 0;
+ ssize_t outlen;
+ struct knet_host *src_host;
+ struct knet_link *src_link;
+ unsigned long long latency_last;
+ uint16_t dst_host_ids[KNET_MAX_HOST];
+ size_t dst_host_ids_entries = 0;
+ int bcast = 1;
+ struct timespec recvtime;
+ struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
+ unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
+ ssize_t len = msg->msg_len;
+ struct knet_hostinfo *knet_hostinfo;
+ struct iovec iov_out[1];
+ int8_t channel;
+ struct sockaddr_storage pckt_src;
+ seq_num_t recv_seq_num;
+ int wipe_bufs = 0;
+
+ if (knet_h->crypto_instance) {
+ if (crypto_authenticate_and_decrypt(knet_h,
+ (unsigned char *)inbuf,
+ len,
+ knet_h->recv_from_links_buf_decrypt,
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
+ return;
+ }
+ len = outlen;
+ inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
+ }
+
+ if (len < (KNET_HEADER_SIZE + 1)) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", len);
+ return;
+ }
+
+ if (inbuf->kh_version != KNET_HEADER_VERSION) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
+ return;
+ }
+
+ inbuf->kh_node = ntohs(inbuf->kh_node);
+ src_host = knet_h->host_index[inbuf->kh_node];
+ if (src_host == NULL) { /* host not found */
+ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
+ return;
+ }
+
+ src_link = NULL;
+
+ if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
+ src_link = src_host->link +
+ (inbuf->khp_ping_link % KNET_MAX_LINK);
+ if (src_link->dynamic == KNET_LINK_DYNIP) {
+ /*
+ * cpyaddrport will only copy address and port of the incoming
+ * packet and strip extra bits such as flow and scopeid
+ */
+ cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
+
+ if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
+ &pckt_src, sockaddr_len(&pckt_src)) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
+ src_host->host_id, src_link->link_id);
+ memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
+ if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
+ src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
+ src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
+ snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
+ snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
+ } else {
+ log_info(knet_h, KNET_SUB_RX,
+ "host: %u link: %u new connection established from: %s %s",
+ src_host->host_id, src_link->link_id,
+ src_link->status.dst_ipaddr, src_link->status.dst_port);
+ }
+ }
+ /*
+ * transport has already accepted the connection here
+ * otherwise we would not be receiving packets
+ */
+ knet_h->transport_ops[src_link->transport_type]->transport_link_dyn_connect(knet_h, sockfd, src_link);
+ }
+ }
+
+ switch (inbuf->kh_type) {
+ case KNET_HEADER_TYPE_HOST_INFO:
+ case KNET_HEADER_TYPE_DATA:
+ /*
+ * TODO: should we accept data even if we can't reply to the other node?
+ * how would that work with SCTP and guaranteed delivery?
+ */
+
+ if (!src_host->status.reachable) {
+ log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id);
+ //return;
+ }
+ inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
+ channel = inbuf->khp_data_channel;
+ src_host->got_data = 1;
+
+ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
+ if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
+ }
+ return;
+ }
+
+ if (inbuf->khp_data_frag_num > 1) {
+ /*
+ * len as received from the socket also includes extra stuff
+ * that the defrag code doesn't care about. So strip it
+ * here and readd only for repadding once we are done
+ * defragging
+ */
+ len = len - KNET_HEADER_DATA_SIZE;
+ if (pckt_defrag(knet_h, inbuf, &len)) {
+ return;
+ }
+ len = len + KNET_HEADER_DATA_SIZE;
+ }
+
+ if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
+ if (knet_h->enabled != 1) /* data forward is disabled */
+ break;
+
+ if (knet_h->dst_host_filter_fn) {
+ int host_idx;
+ int found = 0;
+
+ bcast = knet_h->dst_host_filter_fn(
+ knet_h->dst_host_filter_fn_private_data,
+ (const unsigned char *)inbuf->khp_data_userdata,
+ len - KNET_HEADER_DATA_SIZE,
+ KNET_NOTIFY_RX,
+ knet_h->host_id,
+ inbuf->kh_node,
+ &channel,
+ dst_host_ids,
+ &dst_host_ids_entries);
+ if (bcast < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
+ return;
+ }
+
+ if ((!bcast) && (!dst_host_ids_entries)) {
+ log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
+ return;
+ }
+
+ /* check if we are dst for this packet */
+ if (!bcast) {
+ for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
+ if (dst_host_ids[host_idx] == knet_h->host_id) {
+ found = 1;
+ break;
+ }
+ }
+ if (!found) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
+ return;
+ }
+ }
+ }
+ }
+
+ if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
+ if (!knet_h->sockfd[channel].in_use) {
+ log_debug(knet_h, KNET_SUB_RX,
+ "received packet for channel %d but there is no local sock connected",
+ channel);
+ return;
+ }
+
+ memset(iov_out, 0, sizeof(iov_out));
+ iov_out[0].iov_base = (void *) inbuf->khp_data_userdata;
+ iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE;
+
+ outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
+ if (outlen <= 0) {
+ knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
+ knet_h->sockfd[channel].sockfd[0],
+ channel,
+ KNET_NOTIFY_RX,
+ outlen,
+ errno);
+ return;
+ }
+ if (outlen == iov_out[0].iov_len) {
+ _seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
+ }
+ } else { /* HOSTINFO */
+ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
+ if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
+ bcast = 0;
+ knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
+ }
+ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
+ return;
+ }
+ _seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
+ switch(knet_hostinfo->khi_type) {
+ case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
+ break;
+ case KNET_HOSTINFO_TYPE_LINK_TABLE:
+ break;
+ default:
+ log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
+ break;
+ }
+ }
+ break;
+ case KNET_HEADER_TYPE_PING:
+ outlen = KNET_HEADER_PING_SIZE;
+ inbuf->kh_type = KNET_HEADER_TYPE_PONG;
+ inbuf->kh_node = htons(knet_h->host_id);
+ recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
+
+ wipe_bufs = 0;
+
+ if (!inbuf->khp_ping_timed) {
+ /*
+ * we might be receiving this message from all links, but we want
+ * to process it only the first time
+ */
+ if (recv_seq_num != src_host->untimed_rx_seq_num) {
+ /*
+ * cache the untimed seq num
+ */
+ src_host->untimed_rx_seq_num = recv_seq_num;
+ /*
+ * if the host has received data in between
+ * untimed ping, then we don't need to wipe the bufs
+ */
+ if (src_host->got_data) {
+ src_host->got_data = 0;
+ wipe_bufs = 0;
+ } else {
+ wipe_bufs = 1;
+ }
+ }
+ _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
+ } else {
+ /*
+ * pings always arrives in bursts over all the link
+ * catch the first of them to cache the seq num and
+ * avoid duplicate processing
+ */
+ if (recv_seq_num != src_host->timed_rx_seq_num) {
+ src_host->timed_rx_seq_num = recv_seq_num;
+
+ if (recv_seq_num == 0) {
+ _seq_num_lookup(src_host, recv_seq_num, 0, 1);
+ }
+ }
+ }
+
+ if (knet_h->crypto_instance) {
+ if (crypto_encrypt_and_sign(knet_h,
+ (const unsigned char *)inbuf,
+ len,
+ knet_h->recv_from_links_buf_crypt,
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
+ break;
+ }
+ outbuf = knet_h->recv_from_links_buf_crypt;
+ }
+
+retry_pong:
+ len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
+ (struct sockaddr *) &src_link->dst_addr,
+ sizeof(struct sockaddr_storage));
+ savederrno = errno;
+ if (len != outlen) {
+ err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
+ switch(err) {
+ case -1: /* unrecoverable error */
+ log_debug(knet_h, KNET_SUB_RX,
+ "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
+ src_link->outsock, errno, strerror(errno),
+ src_link->status.src_ipaddr, src_link->status.src_port,
+ src_link->status.dst_ipaddr, src_link->status.dst_port);
+ break;
+ case 0: /* ignore error and continue */
+ break;
+ case 1: /* retry to send those same data */
+ goto retry_pong;
+ break;
+ }
+ }
+ break;
+ case KNET_HEADER_TYPE_PONG:
+ clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
+
+ memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
+ timespec_diff(recvtime,
+ src_link->status.pong_last, &latency_last);
+
+ src_link->status.latency =
+ ((src_link->status.latency * src_link->latency_exp) +
+ ((latency_last / 1000llu) *
+ (src_link->latency_fix - src_link->latency_exp))) /
+ src_link->latency_fix;
+
+ if (src_link->status.latency < src_link->pong_timeout) {
+ if (!src_link->status.connected) {
+ if (src_link->received_pong >= src_link->pong_count) {
+ log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
+ src_host->host_id, src_link->link_id);
+ _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
+ } else {
+ src_link->received_pong++;
+ log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
+ src_host->host_id, src_link->link_id, src_link->received_pong);
+ }
+ }
+ }
+
+ break;
+ case KNET_HEADER_TYPE_PMTUD:
+ outlen = KNET_HEADER_PMTUD_SIZE;
+ inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
+ inbuf->kh_node = htons(knet_h->host_id);
+
+ if (knet_h->crypto_instance) {
+ if (crypto_encrypt_and_sign(knet_h,
+ (const unsigned char *)inbuf,
+ len,
+ knet_h->recv_from_links_buf_crypt,
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
+ break;
+ }
+ outbuf = knet_h->recv_from_links_buf_crypt;
+ }
+
+retry_pmtud:
+ len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
+ (struct sockaddr *) &src_link->dst_addr,
+ sizeof(struct sockaddr_storage));
+ if (len != outlen) {
+ err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
+ switch(err) {
+ case -1: /* unrecoverable error */
+ log_debug(knet_h, KNET_SUB_RX,
+ "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
+ src_link->outsock, errno, strerror(errno),
+ src_link->status.src_ipaddr, src_link->status.src_port,
+ src_link->status.dst_ipaddr, src_link->status.dst_port);
+ break;
+ case 0: /* ignore error and continue */
+ break;
+ case 1: /* retry to send those same data */
+ goto retry_pmtud;
+ break;
+ }
+ }
+
+ break;
+ case KNET_HEADER_TYPE_PMTUD_REPLY:
+ if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
+ break;
+ }
+ src_link->last_recv_mtu = inbuf->khp_pmtud_size;
+ pthread_cond_signal(&knet_h->pmtud_cond);
+ pthread_mutex_unlock(&knet_h->pmtud_mutex);
+ break;
+ default:
+ return;
+ }
+}
+
+static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg)
+{
+ int err, savederrno;
+ int i, msg_recv, transport;
+
+ if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
+ return;
+ }
+
+ if (_is_valid_fd(knet_h, sockfd) < 1) {
+ /*
+ * this is normal if a fd got an event and before we grab the read lock
+ * and the link is removed by another thread
+ */
+ goto exit_unlock;
+ }
+
+ transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
+
+ /*
+ * reset msg_namelen to buffer size because after recvmmsg
+ * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
+ */
+
+ for (i = 0; i < PCKT_FRAG_MAX; i++) {
+ msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ }
+
+ msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
+ savederrno = errno;
+
+ /*
+ * WARNING: man page for recvmmsg is wrong. Kernel implementation here:
+ * recvmmsg can return:
+ * -1 on error
+ * 0 if the previous run of recvmmsg recorded an error on the socket
+ * N number of messages (see exception below).
+ *
+ * If there is an error from recvmsg after receiving a frame or more, the recvmmsg
+ * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
+ * it will be visibile in the next run.
+ *
+ * Need to be careful how we handle errors at this stage.
+ *
+ * error messages need to be handled on a per transport/protocol base
+ * at this point we have different layers of error handling
+ * - msg_recv < 0 -> error from this run
+ * msg_recv = 0 -> error from previous run and error on socket needs to be cleared
+ * - per-transport message data
+ * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
+ * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
+ * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
+ * errno set. That means the error api needs to be able to abort the loop below.
+ */
+
+ if (msg_recv <= 0) {
+ knet_h->transport_ops[transport]->transport_rx_sock_error(knet_h, sockfd, msg_recv, savederrno);
+ goto exit_unlock;
+ }
+
+ for (i = 0; i < msg_recv; i++) {
+ err = knet_h->transport_ops[transport]->transport_rx_is_data(knet_h, sockfd, &msg[i]);
+
+ /*
+ * TODO: make this section silent once we are confident
+ * all protocols packet handlers are good
+ */
+
+ switch(err) {
+ case -1: /* on error */
+ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
+ goto exit_unlock;
+ break;
+ case 0: /* packet is not data and we should continue the packet process loop */
+ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
+ break;
+ case 1: /* packet is not data and we should STOP the packet process loop */
+ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
+ goto exit_unlock;
+ break;
+ case 2: /* packet is data and should be parsed as such */
+ _parse_recv_from_links(knet_h, sockfd, &msg[i]);
+ break;
+ }
+ }
+
+exit_unlock:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+}
+
+void *_handle_recv_from_links_thread(void *data)
+{
+ int i, nev;
+ knet_handle_t knet_h = (knet_handle_t) data;
+ struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
+ struct sockaddr_storage address[PCKT_FRAG_MAX];
+ struct mmsghdr msg[PCKT_FRAG_MAX];
+ struct iovec iov_in[PCKT_FRAG_MAX];
+
+ memset(&msg, 0, sizeof(msg));
+
+ for (i = 0; i < PCKT_FRAG_MAX; i++) {
+ iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
+ iov_in[i].iov_len = KNET_DATABUFSIZE;
+
+ memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
+
+ msg[i].msg_hdr.msg_name = &address[i];
+ msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ msg[i].msg_hdr.msg_iov = &iov_in[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+ }
+
+ while (!shutdown_in_progress(knet_h)) {
+ nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
+
+ for (i = 0; i < nev; i++) {
+ _handle_recv_from_links(knet_h, events[i].data.fd, msg);
+ }
+ }
+
+ return NULL;
+}
diff --git a/libknet/threads_rx.h b/libknet/threads_rx.h
new file mode 100644
index 00000000..fec6e5f1
--- /dev/null
+++ b/libknet/threads_rx.h
@@ -0,0 +1,16 @@
+/*
+ * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ * Federico Simoncelli <fsimon@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#ifndef __THREADS_SEND_RECV_H__
+#define __THREADS_SEND_RECV_H__
+
+void *_handle_send_to_links_thread(void *data);
+void *_handle_recv_from_links_thread(void *data);
+
+#endif
diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c
new file mode 100644
index 00000000..37375346
--- /dev/null
+++ b/libknet/threads_tx.c
@@ -0,0 +1,1302 @@
+/*
+ * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ * Federico Simoncelli <fsimon@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#include "config.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <pthread.h>
+#include <math.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <sys/uio.h>
+
+#include "crypto.h"
+#include "compat.h"
+#include "host.h"
+#include "link.h"
+#include "logging.h"
+#include "transports.h"
+#include "threads_common.h"
+#include "threads_heartbeat.h"
+#include "threads_send_recv.h"
+#include "netutils.h"
+
+/*
+ * SEND
+ */
+
+static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct iovec *iov_out)
+{
+ int link_idx, msg_idx, sent_msgs, msgs_to_send, prev_sent, progress;
+ struct mmsghdr msg[PCKT_FRAG_MAX];
+ int err = 0, savederrno = 0;
+
+ memset(&msg, 0, sizeof(struct mmsghdr));
+
+ for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
+
+ msgs_to_send = knet_h->send_to_links_buf[0]->khp_data_frag_num;
+ sent_msgs = 0;
+ prev_sent = 0;
+ progress = 1;
+
+retry:
+ msg_idx = 0;
+
+ while (msg_idx < msgs_to_send) {
+ memset(&msg[msg_idx].msg_hdr, 0, sizeof(struct msghdr));
+ msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr;
+ msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx + prev_sent];
+ msg[msg_idx].msg_hdr.msg_iovlen = 1;
+ msg_idx++;
+ }
+
+ sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
+ msg, msg_idx, MSG_DONTWAIT | MSG_NOSIGNAL);
+ savederrno = errno;
+
+ err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
+ switch(err) {
+ case -1: /* unrecoverable error */
+ goto out_unlock;
+ break;
+ case 0: /* ignore error and continue */
+ break;
+ case 1: /* retry to send those same data */
+ goto retry;
+ break;
+ }
+
+ if ((sent_msgs >= 0) && (sent_msgs < msg_idx)) {
+ if ((sent_msgs) || (progress)) {
+ msgs_to_send = msg_idx - sent_msgs;
+ prev_sent = prev_sent + sent_msgs;
+ if (sent_msgs) {
+ progress = 1;
+ } else {
+ progress = 0;
+ }
+ log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
+ sent_msgs, msg_idx,
+ dst_host->name, dst_host->host_id,
+ dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
+ dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
+ dst_host->link[dst_host->active_links[link_idx]].link_id);
+ goto retry;
+ }
+ if (!progress) {
+ savederrno = EAGAIN;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+
+ if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
+ (dst_host->active_link_entries > 1)) {
+ uint8_t cur_link_id = dst_host->active_links[0];
+
+ memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
+ dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
+
+ break;
+ }
+ }
+
+out_unlock:
+ errno = savederrno;
+ return err;
+}
+
+static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync)
+{
+ ssize_t outlen, frag_len;
+ struct knet_host *dst_host;
+ uint16_t dst_host_ids_temp[KNET_MAX_HOST];
+ size_t dst_host_ids_entries_temp = 0;
+ uint16_t dst_host_ids[KNET_MAX_HOST];
+ size_t dst_host_ids_entries = 0;
+ int bcast = 1;
+ struct knet_hostinfo *knet_hostinfo;
+ struct iovec iov_out[PCKT_FRAG_MAX];
+ uint8_t frag_idx;
+ unsigned int temp_data_mtu;
+ int host_idx;
+ int send_mcast = 0;
+ struct knet_header *inbuf;
+ int savederrno = 0;
+ int err = 0;
+ seq_num_t tx_seq_num;
+
+ inbuf = knet_h->recv_from_sock_buf[buf_idx];
+
+ if ((knet_h->enabled != 1) &&
+ (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
+ log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
+ savederrno = ECANCELED;
+ err = -1;
+ goto out_unlock;
+ }
+
+ /*
+ * move this into a separate function to expand on
+ * extra switching rules
+ */
+ switch(inbuf->kh_type) {
+ case KNET_HEADER_TYPE_DATA:
+ if (knet_h->dst_host_filter_fn) {
+ bcast = knet_h->dst_host_filter_fn(
+ knet_h->dst_host_filter_fn_private_data,
+ (const unsigned char *)inbuf->khp_data_userdata,
+ inlen,
+ KNET_NOTIFY_TX,
+ knet_h->host_id,
+ knet_h->host_id,
+ &channel,
+ dst_host_ids_temp,
+ &dst_host_ids_entries_temp);
+ if (bcast < 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
+ savederrno = EFAULT;
+ err = -1;
+ goto out_unlock;
+ }
+
+ if ((!bcast) && (!dst_host_ids_entries_temp)) {
+ log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
+ savederrno = EINVAL;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+ break;
+ case KNET_HEADER_TYPE_HOST_INFO:
+ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
+ if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
+ bcast = 0;
+ dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
+ dst_host_ids_entries_temp = 1;
+ knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
+ }
+ break;
+ default:
+ log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
+ savederrno = ENOMSG;
+ err = -1;
+ goto out_unlock;
+ break;
+ }
+
+ if (is_sync) {
+ if ((bcast) ||
+ ((!bcast) && (dst_host_ids_entries_temp > 1))) {
+ log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
+ savederrno = E2BIG;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+
+ /*
+ * check destinations hosts before spending time
+ * in fragmenting/encrypting packets to save
+ * time processing data for unrechable hosts.
+ * for unicast, also remap the destination data
+ * to skip unreachable hosts.
+ */
+
+ if (!bcast) {
+ dst_host_ids_entries = 0;
+ for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
+ dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
+ if (!dst_host) {
+ continue;
+ }
+ if (dst_host->status.reachable) {
+ dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
+ dst_host_ids_entries++;
+ }
+ }
+ if (!dst_host_ids_entries) {
+ savederrno = EHOSTDOWN;
+ err = -1;
+ goto out_unlock;
+ }
+ } else {
+ send_mcast = 0;
+ for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
+ if (dst_host->status.reachable) {
+ send_mcast = 1;
+ break;
+ }
+ }
+ if (!send_mcast) {
+ savederrno = EHOSTDOWN;
+ err = -1;
+ goto out_unlock;
+ }
+ }
+
+ if (!knet_h->data_mtu) {
+ /*
+ * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
+ */
+ log_debug(knet_h, KNET_SUB_TX,
+ "Received data packet but data MTU is still unknown."
+ " Packet might not be delivered."
+ " Assuming mininum IPv4 mtu (%d)",
+ KNET_PMTUD_MIN_MTU_V4);
+ temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
+ } else {
+ /*
+ * take a copy of the mtu to avoid value changing under
+ * our feet while we are sending a fragmented pckt
+ */
+ temp_data_mtu = knet_h->data_mtu;
+ }
+
+ /*
+ * prepare the outgoing buffers
+ */
+
+ frag_len = inlen;
+ frag_idx = 0;
+
+ inbuf->khp_data_bcast = bcast;
+ inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
+ inbuf->khp_data_channel = channel;
+
+ if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
+ goto out_unlock;
+ }
+ knet_h->tx_seq_num++;
+ /*
+ * force seq_num 0 to detect a node that has crashed and rejoining
+ * the knet instance. seq_num 0 will clear the buffers in the RX
+ * thread
+ */
+ if (knet_h->tx_seq_num == 0) {
+ knet_h->tx_seq_num++;
+ }
+ /*
+ * cache the value in locked context
+ */
+ tx_seq_num = knet_h->tx_seq_num;
+ knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(knet_h->tx_seq_num);
+ pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
+
+ /*
+ * forcefully broadcast a ping to all nodes every SEQ_MAX / 8
+ * pckts.
+ * this solves 2 problems:
+ * 1) on TX socket overloads we generate extra pings to keep links alive
+ * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
+ * node 3+ will be able to keep in sync on the TX seq_num even without
+ * receiving traffic or pings in betweens. This avoids issues with
+ * rollover of the circular buffer
+ */
+
+ if (tx_seq_num % (SEQ_MAX / 8) == 0) {
+ _send_pings(knet_h, 0);
+ }
+
+ while (frag_idx < inbuf->khp_data_frag_num) {
+ /*
+ * set the iov_base
+ */
+ iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
+
+ /*
+ * set the len
+ */
+ if (frag_len > temp_data_mtu) {
+ iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE;
+ } else {
+ iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
+ }
+
+ /*
+ * copy the frag info on all buffers
+ */
+ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
+
+ knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num;
+ knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
+ knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
+ knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
+
+ memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata,
+ inbuf->khp_data_userdata + (temp_data_mtu * frag_idx),
+ iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE);
+
+ frag_len = frag_len - temp_data_mtu;
+ frag_idx++;
+ }
+
+ if (knet_h->crypto_instance) {
+ frag_idx = 0;
+ while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) {
+ if (crypto_encrypt_and_sign(
+ knet_h,
+ (const unsigned char *)knet_h->send_to_links_buf[frag_idx],
+ iov_out[frag_idx].iov_len,
+ knet_h->send_to_links_buf_crypt[frag_idx],
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt unicast packet");
+ savederrno = ECHILD;
+ err = -1;
+ goto out_unlock;
+ }
+ iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
+ iov_out[frag_idx].iov_len = outlen;
+ frag_idx++;
+ }
+ }
+
+ if (!bcast) {
+ for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
+ dst_host = knet_h->host_index[dst_host_ids[host_idx]];
+
+ err = _dispatch_to_links(knet_h, dst_host, iov_out);
+ savederrno = errno;
+ if (err) {
+ goto out_unlock;
+ }
+ }
+ } else {
+ for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
+ if (dst_host->status.reachable) {
+ err = _dispatch_to_links(knet_h, dst_host, iov_out);
+ savederrno = errno;
+ if (err) {
+ goto out_unlock;
+ }
+ }
+ }
+ }
+
+out_unlock:
+ errno = savederrno;
+ return err;
+}
+
+int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
+{
+ int savederrno = 0, err = 0;
+
+ if (!knet_h) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len <= 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (buff_len > KNET_MAX_PACKET_SIZE) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (channel >= KNET_DATAFD_MAX) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
+ strerror(savederrno));
+ errno = savederrno;
+ return -1;
+ }
+
+ if (!knet_h->sockfd[channel].in_use) {
+ savederrno = EINVAL;
+ err = -1;
+ goto out;
+ }
+
+ savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
+ if (savederrno) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
+ strerror(savederrno));
+ err = -1;
+ goto out;
+ }
+
+ knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA;
+ memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len);
+ err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1);
+ savederrno = errno;
+
+ pthread_mutex_unlock(&knet_h->tx_mutex);
+
+out:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+
+ errno = savederrno;
+ return err;
+}
+
+static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct mmsghdr *msg, int type)
+{
+ ssize_t inlen = 0;
+ struct iovec iov_in;
+ int msg_recv, i;
+ int savederrno = 0, docallback = 0;
+
+ if ((channel >= 0) &&
+ (channel < KNET_DATAFD_MAX) &&
+ (!knet_h->sockfd[channel].is_socket)) {
+ memset(&iov_in, 0, sizeof(iov_in));
+ iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata;
+ iov_in.iov_len = KNET_MAX_PACKET_SIZE;
+
+ inlen = readv(sockfd, &iov_in, 1);
+
+ if (inlen <= 0) {
+ savederrno = errno;
+ docallback = 1;
+ goto out;
+ }
+
+ msg_recv = 1;
+ knet_h->recv_from_sock_buf[0]->kh_type = type;
+ _parse_recv_from_sock(knet_h, 0, inlen, channel, 0);
+ } else {
+ msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
+ if (msg_recv < 0) {
+ inlen = msg_recv;
+ savederrno = errno;
+ docallback = 1;
+ goto out;
+ }
+ for (i = 0; i < msg_recv; i++) {
+ inlen = msg[i].msg_len;
+ if (inlen == 0) {
+ savederrno = 0;
+ docallback = 1;
+ goto out;
+ break;
+ }
+ knet_h->recv_from_sock_buf[i]->kh_type = type;
+ _parse_recv_from_sock(knet_h, i, inlen, channel, 0);
+ }
+ }
+
+out:
+ if (inlen < 0) {
+ struct epoll_event ev;
+
+ memset(&ev, 0, sizeof(struct epoll_event));
+
+ if (epoll_ctl(knet_h->send_to_links_epollfd,
+ EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
+ log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
+ knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
+ } else {
+ knet_h->sockfd[channel].has_error = 1;
+ }
+
+ }
+
+ if (docallback) {
+ knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
+ knet_h->sockfd[channel].sockfd[0],
+ channel,
+ KNET_NOTIFY_TX,
+ inlen,
+ savederrno);
+ }
+}
+
+void *_handle_send_to_links_thread(void *data)
+{
+ knet_handle_t knet_h = (knet_handle_t) data;
+ struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
+ struct sockaddr_storage address[PCKT_FRAG_MAX];
+ struct mmsghdr msg[PCKT_FRAG_MAX];
+ struct iovec iov_in[PCKT_FRAG_MAX];
+ int i, nev, type;
+ int8_t channel;
+
+ memset(&msg, 0, sizeof(struct mmsghdr));
+
+ /* preparing data buffer */
+ for (i = 0; i < PCKT_FRAG_MAX; i++) {
+ iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata;
+ iov_in[i].iov_len = KNET_MAX_PACKET_SIZE;
+
+ memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
+
+ msg[i].msg_hdr.msg_name = &address[i];
+ msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ msg[i].msg_hdr.msg_iov = &iov_in[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+
+ knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION;
+ knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0;
+ knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id);
+
+ knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
+ knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
+ knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
+ }
+
+ while (!shutdown_in_progress(knet_h)) {
+ nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1);
+
+ if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
+ continue;
+ }
+
+ for (i = 0; i < nev; i++) {
+ if (events[i].data.fd == knet_h->hostsockfd[0]) {
+ type = KNET_HEADER_TYPE_HOST_INFO;
+ channel = -1;
+ } else {
+ type = KNET_HEADER_TYPE_DATA;
+ for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
+ if ((knet_h->sockfd[channel].in_use) &&
+ (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
+ break;
+ }
+ }
+ }
+ if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
+ log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
+ continue;
+ }
+ _handle_send_to_links(knet_h, events[i].data.fd, channel, msg, type);
+ pthread_mutex_unlock(&knet_h->tx_mutex);
+ }
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+ }
+
+ return NULL;
+}
+
+/*
+ * RECV
+ */
+
+/*
+ * return 1 if a > b
+ * return -1 if b > a
+ * return 0 if they are equal
+ */
+static inline int timecmp(struct timespec a, struct timespec b)
+{
+ if (a.tv_sec != b.tv_sec) {
+ if (a.tv_sec > b.tv_sec) {
+ return 1;
+ } else {
+ return -1;
+ }
+ } else {
+ if (a.tv_nsec > b.tv_nsec) {
+ return 1;
+ } else if (a.tv_nsec < b.tv_nsec) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+}
+
+/*
+ * this functions needs to return an index (0 to 7)
+ * to a knet_host_defrag_buf. (-1 on errors)
+ */
+
+static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
+{
+ struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
+ int i, oldest;
+
+ /*
+ * check if there is a buffer already in use handling the same seq_num
+ */
+ for (i = 0; i < KNET_MAX_LINK; i++) {
+ if (src_host->defrag_buf[i].in_use) {
+ if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
+ return i;
+ }
+ }
+ }
+
+ /*
+ * If there is no buffer that's handling the current seq_num
+ * either it's new or it's been reclaimed already.
+ * check if it's been reclaimed/seen before using the defrag circular
+ * buffer. If the pckt has been seen before, the buffer expired (ETIME)
+ * and there is no point to try to defrag it again.
+ */
+ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
+ errno = ETIME;
+ return -1;
+ }
+
+ /*
+ * register the pckt as seen
+ */
+ _seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
+
+ /*
+ * see if there is a free buffer
+ */
+ for (i = 0; i < KNET_MAX_LINK; i++) {
+ if (!src_host->defrag_buf[i].in_use) {
+ return i;
+ }
+ }
+
+ /*
+ * at this point, there are no free buffers, the pckt is new
+ * and we need to reclaim a buffer, and we will take the one
+ * with the oldest timestamp. It's as good as any.
+ */
+
+ oldest = 0;
+
+ for (i = 0; i < KNET_MAX_LINK; i++) {
+ if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
+ oldest = i;
+ }
+ }
+ src_host->defrag_buf[oldest].in_use = 0;
+ return oldest;
+}
+
+static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
+{
+ struct knet_host_defrag_buf *defrag_buf;
+ int defrag_buf_idx;
+
+ defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
+ if (defrag_buf_idx < 0) {
+ if (errno == ETIME) {
+ log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired");
+ }
+ return 1;
+ }
+
+ defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
+
+ /*
+ * if the buf is not is use, then make sure it's clean
+ */
+ if (!defrag_buf->in_use) {
+ memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
+ defrag_buf->in_use = 1;
+ defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
+ }
+
+ /*
+ * update timestamp on the buffer
+ */
+ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
+
+ /*
+ * check if we already received this fragment
+ */
+ if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
+ /*
+ * if we have received this fragment and we didn't clear the buffer
+ * it means that we don't have all fragments yet
+ */
+ return 1;
+ }
+
+ /*
+ * we need to handle the last packet with gloves due to its different size
+ */
+
+ if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
+ defrag_buf->last_frag_size = *len;
+
+ /*
+ * in the event when the last packet arrives first,
+ * we still don't know the offset vs the other fragments (based on MTU),
+ * so we store the fragment at the end of the buffer where it's safe
+ * and take a copy of the len so that we can restore its offset later.
+ * remember we can't use the local MTU for this calculation because pMTU
+ * can be asymettric between the same hosts.
+ */
+ if (!defrag_buf->frag_size) {
+ defrag_buf->last_first = 1;
+ memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
+ inbuf->khp_data_userdata,
+ *len);
+ }
+ } else {
+ defrag_buf->frag_size = *len;
+ }
+
+ memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
+ inbuf->khp_data_userdata, *len);
+
+ defrag_buf->frag_recv++;
+ defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
+
+ /*
+ * check if we received all the fragments
+ */
+ if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
+ /*
+ * special case the last pckt
+ */
+
+ if (defrag_buf->last_first) {
+ memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
+ defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
+ defrag_buf->last_frag_size);
+ }
+
+ /*
+ * recalculate packet lenght
+ */
+
+ *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
+
+ /*
+ * copy the pckt back in the user data
+ */
+ memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
+
+ /*
+ * free this buffer
+ */
+ defrag_buf->in_use = 0;
+ return 0;
+ }
+
+ return 1;
+}
+
+static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct mmsghdr *msg)
+{
+ int err = 0, savederrno = 0;
+ ssize_t outlen;
+ struct knet_host *src_host;
+ struct knet_link *src_link;
+ unsigned long long latency_last;
+ uint16_t dst_host_ids[KNET_MAX_HOST];
+ size_t dst_host_ids_entries = 0;
+ int bcast = 1;
+ struct timespec recvtime;
+ struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
+ unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
+ ssize_t len = msg->msg_len;
+ struct knet_hostinfo *knet_hostinfo;
+ struct iovec iov_out[1];
+ int8_t channel;
+ struct sockaddr_storage pckt_src;
+ seq_num_t recv_seq_num;
+ int wipe_bufs = 0;
+
+ if (knet_h->crypto_instance) {
+ if (crypto_authenticate_and_decrypt(knet_h,
+ (unsigned char *)inbuf,
+ len,
+ knet_h->recv_from_links_buf_decrypt,
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
+ return;
+ }
+ len = outlen;
+ inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
+ }
+
+ if (len < (KNET_HEADER_SIZE + 1)) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", len);
+ return;
+ }
+
+ if (inbuf->kh_version != KNET_HEADER_VERSION) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
+ return;
+ }
+
+ inbuf->kh_node = ntohs(inbuf->kh_node);
+ src_host = knet_h->host_index[inbuf->kh_node];
+ if (src_host == NULL) { /* host not found */
+ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
+ return;
+ }
+
+ src_link = NULL;
+
+ if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
+ src_link = src_host->link +
+ (inbuf->khp_ping_link % KNET_MAX_LINK);
+ if (src_link->dynamic == KNET_LINK_DYNIP) {
+ /*
+ * cpyaddrport will only copy address and port of the incoming
+ * packet and strip extra bits such as flow and scopeid
+ */
+ cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
+
+ if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
+ &pckt_src, sockaddr_len(&pckt_src)) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
+ src_host->host_id, src_link->link_id);
+ memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
+ if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
+ src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
+ src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
+ snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
+ snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
+ } else {
+ log_info(knet_h, KNET_SUB_RX,
+ "host: %u link: %u new connection established from: %s %s",
+ src_host->host_id, src_link->link_id,
+ src_link->status.dst_ipaddr, src_link->status.dst_port);
+ }
+ }
+ /*
+ * transport has already accepted the connection here
+ * otherwise we would not be receiving packets
+ */
+ knet_h->transport_ops[src_link->transport_type]->transport_link_dyn_connect(knet_h, sockfd, src_link);
+ }
+ }
+
+ switch (inbuf->kh_type) {
+ case KNET_HEADER_TYPE_HOST_INFO:
+ case KNET_HEADER_TYPE_DATA:
+ /*
+ * TODO: should we accept data even if we can't reply to the other node?
+ * how would that work with SCTP and guaranteed delivery?
+ */
+
+ if (!src_host->status.reachable) {
+ log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id);
+ //return;
+ }
+ inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
+ channel = inbuf->khp_data_channel;
+ src_host->got_data = 1;
+
+ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
+ if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
+ }
+ return;
+ }
+
+ if (inbuf->khp_data_frag_num > 1) {
+ /*
+ * len as received from the socket also includes extra stuff
+ * that the defrag code doesn't care about. So strip it
+ * here and readd only for repadding once we are done
+ * defragging
+ */
+ len = len - KNET_HEADER_DATA_SIZE;
+ if (pckt_defrag(knet_h, inbuf, &len)) {
+ return;
+ }
+ len = len + KNET_HEADER_DATA_SIZE;
+ }
+
+ if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
+ if (knet_h->enabled != 1) /* data forward is disabled */
+ break;
+
+ if (knet_h->dst_host_filter_fn) {
+ int host_idx;
+ int found = 0;
+
+ bcast = knet_h->dst_host_filter_fn(
+ knet_h->dst_host_filter_fn_private_data,
+ (const unsigned char *)inbuf->khp_data_userdata,
+ len - KNET_HEADER_DATA_SIZE,
+ KNET_NOTIFY_RX,
+ knet_h->host_id,
+ inbuf->kh_node,
+ &channel,
+ dst_host_ids,
+ &dst_host_ids_entries);
+ if (bcast < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
+ return;
+ }
+
+ if ((!bcast) && (!dst_host_ids_entries)) {
+ log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
+ return;
+ }
+
+ /* check if we are dst for this packet */
+ if (!bcast) {
+ for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
+ if (dst_host_ids[host_idx] == knet_h->host_id) {
+ found = 1;
+ break;
+ }
+ }
+ if (!found) {
+ log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
+ return;
+ }
+ }
+ }
+ }
+
+ if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
+ if (!knet_h->sockfd[channel].in_use) {
+ log_debug(knet_h, KNET_SUB_RX,
+ "received packet for channel %d but there is no local sock connected",
+ channel);
+ return;
+ }
+
+ memset(iov_out, 0, sizeof(iov_out));
+ iov_out[0].iov_base = (void *) inbuf->khp_data_userdata;
+ iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE;
+
+ outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
+ if (outlen <= 0) {
+ knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
+ knet_h->sockfd[channel].sockfd[0],
+ channel,
+ KNET_NOTIFY_RX,
+ outlen,
+ errno);
+ return;
+ }
+ if (outlen == iov_out[0].iov_len) {
+ _seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
+ }
+ } else { /* HOSTINFO */
+ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
+ if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
+ bcast = 0;
+ knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
+ }
+ if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
+ return;
+ }
+ _seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
+ switch(knet_hostinfo->khi_type) {
+ case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
+ break;
+ case KNET_HOSTINFO_TYPE_LINK_TABLE:
+ break;
+ default:
+ log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
+ break;
+ }
+ }
+ break;
+ case KNET_HEADER_TYPE_PING:
+ outlen = KNET_HEADER_PING_SIZE;
+ inbuf->kh_type = KNET_HEADER_TYPE_PONG;
+ inbuf->kh_node = htons(knet_h->host_id);
+ recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
+
+ wipe_bufs = 0;
+
+ if (!inbuf->khp_ping_timed) {
+ /*
+ * we might be receiving this message from all links, but we want
+ * to process it only the first time
+ */
+ if (recv_seq_num != src_host->untimed_rx_seq_num) {
+ /*
+ * cache the untimed seq num
+ */
+ src_host->untimed_rx_seq_num = recv_seq_num;
+ /*
+ * if the host has received data in between
+ * untimed ping, then we don't need to wipe the bufs
+ */
+ if (src_host->got_data) {
+ src_host->got_data = 0;
+ wipe_bufs = 0;
+ } else {
+ wipe_bufs = 1;
+ }
+ }
+ _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
+ } else {
+ /*
+ * pings always arrives in bursts over all the link
+ * catch the first of them to cache the seq num and
+ * avoid duplicate processing
+ */
+ if (recv_seq_num != src_host->timed_rx_seq_num) {
+ src_host->timed_rx_seq_num = recv_seq_num;
+
+ if (recv_seq_num == 0) {
+ _seq_num_lookup(src_host, recv_seq_num, 0, 1);
+ }
+ }
+ }
+
+ if (knet_h->crypto_instance) {
+ if (crypto_encrypt_and_sign(knet_h,
+ (const unsigned char *)inbuf,
+ len,
+ knet_h->recv_from_links_buf_crypt,
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
+ break;
+ }
+ outbuf = knet_h->recv_from_links_buf_crypt;
+ }
+
+retry_pong:
+ len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
+ (struct sockaddr *) &src_link->dst_addr,
+ sizeof(struct sockaddr_storage));
+ savederrno = errno;
+ if (len != outlen) {
+ err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
+ switch(err) {
+ case -1: /* unrecoverable error */
+ log_debug(knet_h, KNET_SUB_RX,
+ "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
+ src_link->outsock, errno, strerror(errno),
+ src_link->status.src_ipaddr, src_link->status.src_port,
+ src_link->status.dst_ipaddr, src_link->status.dst_port);
+ break;
+ case 0: /* ignore error and continue */
+ break;
+ case 1: /* retry to send those same data */
+ goto retry_pong;
+ break;
+ }
+ }
+ break;
+ case KNET_HEADER_TYPE_PONG:
+ clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
+
+ memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
+ timespec_diff(recvtime,
+ src_link->status.pong_last, &latency_last);
+
+ src_link->status.latency =
+ ((src_link->status.latency * src_link->latency_exp) +
+ ((latency_last / 1000llu) *
+ (src_link->latency_fix - src_link->latency_exp))) /
+ src_link->latency_fix;
+
+ if (src_link->status.latency < src_link->pong_timeout) {
+ if (!src_link->status.connected) {
+ if (src_link->received_pong >= src_link->pong_count) {
+ log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
+ src_host->host_id, src_link->link_id);
+ _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
+ } else {
+ src_link->received_pong++;
+ log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
+ src_host->host_id, src_link->link_id, src_link->received_pong);
+ }
+ }
+ }
+
+ break;
+ case KNET_HEADER_TYPE_PMTUD:
+ outlen = KNET_HEADER_PMTUD_SIZE;
+ inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
+ inbuf->kh_node = htons(knet_h->host_id);
+
+ if (knet_h->crypto_instance) {
+ if (crypto_encrypt_and_sign(knet_h,
+ (const unsigned char *)inbuf,
+ len,
+ knet_h->recv_from_links_buf_crypt,
+ &outlen) < 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
+ break;
+ }
+ outbuf = knet_h->recv_from_links_buf_crypt;
+ }
+
+retry_pmtud:
+ len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
+ (struct sockaddr *) &src_link->dst_addr,
+ sizeof(struct sockaddr_storage));
+ if (len != outlen) {
+ err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
+ switch(err) {
+ case -1: /* unrecoverable error */
+ log_debug(knet_h, KNET_SUB_RX,
+ "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
+ src_link->outsock, errno, strerror(errno),
+ src_link->status.src_ipaddr, src_link->status.src_port,
+ src_link->status.dst_ipaddr, src_link->status.dst_port);
+ break;
+ case 0: /* ignore error and continue */
+ break;
+ case 1: /* retry to send those same data */
+ goto retry_pmtud;
+ break;
+ }
+ }
+
+ break;
+ case KNET_HEADER_TYPE_PMTUD_REPLY:
+ if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
+ break;
+ }
+ src_link->last_recv_mtu = inbuf->khp_pmtud_size;
+ pthread_cond_signal(&knet_h->pmtud_cond);
+ pthread_mutex_unlock(&knet_h->pmtud_mutex);
+ break;
+ default:
+ return;
+ }
+}
+
+static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg)
+{
+ int err, savederrno;
+ int i, msg_recv, transport;
+
+ if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
+ return;
+ }
+
+ if (_is_valid_fd(knet_h, sockfd) < 1) {
+ /*
+ * this is normal if a fd got an event and before we grab the read lock
+ * and the link is removed by another thread
+ */
+ goto exit_unlock;
+ }
+
+ transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
+
+ /*
+ * reset msg_namelen to buffer size because after recvmmsg
+ * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
+ */
+
+ for (i = 0; i < PCKT_FRAG_MAX; i++) {
+ msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ }
+
+ msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
+ savederrno = errno;
+
+ /*
+ * WARNING: man page for recvmmsg is wrong. Kernel implementation here:
+ * recvmmsg can return:
+ * -1 on error
+ * 0 if the previous run of recvmmsg recorded an error on the socket
+ * N number of messages (see exception below).
+ *
+ * If there is an error from recvmsg after receiving a frame or more, the recvmmsg
+ * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
+ * it will be visibile in the next run.
+ *
+ * Need to be careful how we handle errors at this stage.
+ *
+ * error messages need to be handled on a per transport/protocol base
+ * at this point we have different layers of error handling
+ * - msg_recv < 0 -> error from this run
+ * msg_recv = 0 -> error from previous run and error on socket needs to be cleared
+ * - per-transport message data
+ * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
+ * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
+ * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
+ * errno set. That means the error api needs to be able to abort the loop below.
+ */
+
+ if (msg_recv <= 0) {
+ knet_h->transport_ops[transport]->transport_rx_sock_error(knet_h, sockfd, msg_recv, savederrno);
+ goto exit_unlock;
+ }
+
+ for (i = 0; i < msg_recv; i++) {
+ err = knet_h->transport_ops[transport]->transport_rx_is_data(knet_h, sockfd, &msg[i]);
+
+ /*
+ * TODO: make this section silent once we are confident
+ * all protocols packet handlers are good
+ */
+
+ switch(err) {
+ case -1: /* on error */
+ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
+ goto exit_unlock;
+ break;
+ case 0: /* packet is not data and we should continue the packet process loop */
+ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
+ break;
+ case 1: /* packet is not data and we should STOP the packet process loop */
+ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
+ goto exit_unlock;
+ break;
+ case 2: /* packet is data and should be parsed as such */
+ _parse_recv_from_links(knet_h, sockfd, &msg[i]);
+ break;
+ }
+ }
+
+exit_unlock:
+ pthread_rwlock_unlock(&knet_h->global_rwlock);
+}
+
+void *_handle_recv_from_links_thread(void *data)
+{
+ int i, nev;
+ knet_handle_t knet_h = (knet_handle_t) data;
+ struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
+ struct sockaddr_storage address[PCKT_FRAG_MAX];
+ struct mmsghdr msg[PCKT_FRAG_MAX];
+ struct iovec iov_in[PCKT_FRAG_MAX];
+
+ memset(&msg, 0, sizeof(msg));
+
+ for (i = 0; i < PCKT_FRAG_MAX; i++) {
+ iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
+ iov_in[i].iov_len = KNET_DATABUFSIZE;
+
+ memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
+
+ msg[i].msg_hdr.msg_name = &address[i];
+ msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
+ msg[i].msg_hdr.msg_iov = &iov_in[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+ }
+
+ while (!shutdown_in_progress(knet_h)) {
+ nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
+
+ for (i = 0; i < nev; i++) {
+ _handle_recv_from_links(knet_h, events[i].data.fd, msg);
+ }
+ }
+
+ return NULL;
+}
diff --git a/libknet/threads_tx.h b/libknet/threads_tx.h
new file mode 100644
index 00000000..fec6e5f1
--- /dev/null
+++ b/libknet/threads_tx.h
@@ -0,0 +1,16 @@
+/*
+ * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ * Federico Simoncelli <fsimon@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#ifndef __THREADS_SEND_RECV_H__
+#define __THREADS_SEND_RECV_H__
+
+void *_handle_send_to_links_thread(void *data);
+void *_handle_recv_from_links_thread(void *data);
+
+#endif
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Wed, Jun 25, 7:07 AM (11 h, 29 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1952496
Default Alt Text
(76 KB)
Attached To
Mode
rK kronosnet
Attached
Detach File
Event Timeline
Log In to Comment