diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
index 61e2862d..85fc9965 100644
--- a/libknet/threads_rx.c
+++ b/libknet/threads_rx.c
@@ -1,1064 +1,1065 @@
 /*
  * Copyright (C) 2012-2020 Red Hat, Inc.  All rights reserved.
  *
  * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
  *          Federico Simoncelli <fsimon@kronosnet.org>
  *
  * This software licensed under LGPL-2.0+
  */
 
 #include "config.h"
 
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
 #include <sys/uio.h>
 #include <pthread.h>
 
 #include "compat.h"
 #include "compress.h"
 #include "crypto.h"
 #include "host.h"
 #include "links.h"
 #include "links_acl.h"
 #include "logging.h"
 #include "transports.h"
 #include "transport_common.h"
 #include "threads_common.h"
 #include "threads_heartbeat.h"
 #include "threads_rx.h"
 #include "netutils.h"
 
 /*
  * RECV
  */
 
 /*
  *  return 1 if a > b
  *  return -1 if b > a
  *  return 0 if they are equal
  */
 static inline int timecmp(struct timespec a, struct timespec b)
 {
 	if (a.tv_sec != b.tv_sec) {
 		if (a.tv_sec > b.tv_sec) {
 			return 1;
 		} else {
 			return -1;
 		}
 	} else {
 		if (a.tv_nsec > b.tv_nsec) {
 			return 1;
 		} else if (a.tv_nsec < b.tv_nsec) {
 			return -1;
 		} else {
 			return 0;
 		}
 	}
 }
 
 /*
  * this functions needs to return an index (0 to 7)
  * to a knet_host_defrag_buf. (-1 on errors)
  */
 
 static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
 {
 	struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
 	int i, oldest;
 
 	/*
 	 * check if there is a buffer already in use handling the same seq_num
 	 */
 	for (i = 0; i < KNET_MAX_LINK; i++) {
 		if (src_host->defrag_buf[i].in_use) {
 			if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
 				return i;
 			}
 		}
 	}
 
 	/*
 	 * If there is no buffer that's handling the current seq_num
 	 * either it's new or it's been reclaimed already.
 	 * check if it's been reclaimed/seen before using the defrag circular
 	 * buffer. If the pckt has been seen before, the buffer expired (ETIME)
 	 * and there is no point to try to defrag it again.
 	 */
 	if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
 		errno = ETIME;
 		return -1;
 	}
 
 	/*
 	 * register the pckt as seen
 	 */
 	_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
 
 	/*
 	 * see if there is a free buffer
 	 */
 	for (i = 0; i < KNET_MAX_LINK; i++) {
 		if (!src_host->defrag_buf[i].in_use) {
 			return i;
 		}
 	}
 
 	/*
 	 * at this point, there are no free buffers, the pckt is new
 	 * and we need to reclaim a buffer, and we will take the one
 	 * with the oldest timestamp. It's as good as any.
 	 */
 
 	oldest = 0;
 
 	for (i = 0; i < KNET_MAX_LINK; i++) {
 		if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
 			oldest = i;
 		}
 	}
 	src_host->defrag_buf[oldest].in_use = 0;
 	return oldest;
 }
 
 static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
 {
 	struct knet_host_defrag_buf *defrag_buf;
 	int defrag_buf_idx;
 
 	defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
 	if (defrag_buf_idx < 0) {
 		return 1;
 	}
 
 	defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
 
 	/*
 	 * if the buf is not is use, then make sure it's clean
 	 */
 	if (!defrag_buf->in_use) {
 		memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
 		defrag_buf->in_use = 1;
 		defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
 	}
 
 	/*
 	 * update timestamp on the buffer
 	 */
 	clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
 
 	/*
 	 * check if we already received this fragment
 	 */
 	if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
 		/*
 		 * if we have received this fragment and we didn't clear the buffer
 		 * it means that we don't have all fragments yet
 		 */
 		return 1;
 	}
 
 	/*
 	 *  we need to handle the last packet with gloves due to its different size
 	 */
 
 	if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
 		defrag_buf->last_frag_size = *len;
 
 		/*
 		 * in the event when the last packet arrives first,
 		 * we still don't know the offset vs the other fragments (based on MTU),
 		 * so we store the fragment at the end of the buffer where it's safe
 		 * and take a copy of the len so that we can restore its offset later.
 		 * remember we can't use the local MTU for this calculation because pMTU
 		 * can be asymettric between the same hosts.
 		 */
 		if (!defrag_buf->frag_size) {
 			defrag_buf->last_first = 1;
 			memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
 			       inbuf->khp_data_userdata,
 			       *len);
 		}
 	} else {
 		defrag_buf->frag_size = *len;
 	}
 
 	if (defrag_buf->frag_size) {
 		memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
 		       inbuf->khp_data_userdata, *len);
 	}
 
 	defrag_buf->frag_recv++;
 	defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
 
 	/*
 	 * check if we received all the fragments
 	 */
 	if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
 		/*
 		 * special case the last pckt
 		 */
 
 		if (defrag_buf->last_first) {
 			memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
 			        defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
 				defrag_buf->last_frag_size);
 		}
 
 		/*
 		 * recalculate packet lenght
 		 */
 
 		*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
 
 		/*
 		 * copy the pckt back in the user data
 		 */
 		memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
 
 		/*
 		 * free this buffer
 		 */
 		defrag_buf->in_use = 0;
 		return 0;
 	}
 
 	return 1;
 }
 
 static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
 {
 	int err = 0, savederrno = 0, stats_err = 0;
 	ssize_t outlen;
 	struct knet_host *src_host;
 	struct knet_link *src_link;
 	unsigned long long latency_last;
 	knet_node_id_t dst_host_ids[KNET_MAX_HOST];
 	size_t dst_host_ids_entries = 0;
 	int bcast = 1;
 	uint64_t decrypt_time = 0;
 	struct timespec recvtime;
 	struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
 	unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
 	ssize_t len = msg->msg_len;
 	struct knet_hostinfo *knet_hostinfo;
 	struct iovec iov_out[1];
 	int8_t channel;
 	struct sockaddr_storage pckt_src;
 	seq_num_t recv_seq_num;
 	int wipe_bufs = 0;
 	int try_decrypt = 0, decrypted = 0, i;
 
 	for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
 		if (knet_h->crypto_instance[i]) {
 			try_decrypt = 1;
 			break;
 		}
 	}
 
 	if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) {
 		log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!");
 		return;
 	}
 
 	if (try_decrypt) {
 		struct timespec start_time;
 		struct timespec end_time;
 
 		clock_gettime(CLOCK_MONOTONIC, &start_time);
 		if (crypto_authenticate_and_decrypt(knet_h,
 						    (unsigned char *)inbuf,
 						    len,
 						    knet_h->recv_from_links_buf_decrypt,
 						    &outlen) < 0) {
 			log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
 			if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) {
 				return;
 			}
 			log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data");
 		} else {
 			clock_gettime(CLOCK_MONOTONIC, &end_time);
 			timespec_diff(start_time, end_time, &decrypt_time);
 
 			len = outlen;
 			inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
 			decrypted = 1;
 		}
 	}
 
 	if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
 		log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
 		return;
 	}
 
 	if (inbuf->kh_version != KNET_HEADER_VERSION) {
 		log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
 		return;
 	}
 
 	inbuf->kh_node = ntohs(inbuf->kh_node);
 	src_host = knet_h->host_index[inbuf->kh_node];
 	if (src_host == NULL) {  /* host not found */
 		log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
 		return;
 	}
 
 	src_link = src_host->link +
 		(inbuf->khp_ping_link % KNET_MAX_LINK);
 	if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
 		if (src_link->dynamic == KNET_LINK_DYNIP) {
 			/*
 			 * cpyaddrport will only copy address and port of the incoming
 			 * packet and strip extra bits such as flow and scopeid
 			 */
 			cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
 
 			if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
 				    &pckt_src, sockaddr_len(&pckt_src)) != 0) {
 				log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
 					  src_host->host_id, src_link->link_id);
 				memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
 				if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
 						src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
 						src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
 					log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
 					snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
 					snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
 				} else {
 					log_info(knet_h, KNET_SUB_RX,
 						 "host: %u link: %u new connection established from: %s %s",
 						 src_host->host_id, src_link->link_id,
 						 src_link->status.dst_ipaddr, src_link->status.dst_port);
 				}
 			}
 			/*
 			 * transport has already accepted the connection here
 			 * otherwise we would not be receiving packets
 			 */
 			transport_link_dyn_connect(knet_h, sockfd, src_link);
 		}
 	}
 
 	stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
 	if (stats_err) {
 		log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s",
 			src_host->host_id, src_link->link_id, strerror(savederrno));
 		return;
 	}
 
 	switch (inbuf->kh_type) {
 	case KNET_HEADER_TYPE_HOST_INFO:
 	case KNET_HEADER_TYPE_DATA:
 		if (!src_host->status.reachable) {
 			pthread_mutex_unlock(&src_link->link_stats_mutex);
 			log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id);
 			return;
 		}
 		inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
 		channel = inbuf->khp_data_channel;
 		src_host->got_data = 1;
 
 		src_link->status.stats.rx_data_packets++;
 		src_link->status.stats.rx_data_bytes += len;
 
 		if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
 			pthread_mutex_unlock(&src_link->link_stats_mutex);
 			if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
 				log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
 			}
 			return;
 		}
 
 		if (inbuf->khp_data_frag_num > 1) {
 			/*
 			 * len as received from the socket also includes extra stuff
 			 * that the defrag code doesn't care about. So strip it
 			 * here and readd only for repadding once we are done
 			 * defragging
 			 */
 			len = len - KNET_HEADER_DATA_SIZE;
 			if (pckt_defrag(knet_h, inbuf, &len)) {
 				pthread_mutex_unlock(&src_link->link_stats_mutex);
 				return;
 			}
 			len = len + KNET_HEADER_DATA_SIZE;
 		}
 
 		if (inbuf->khp_data_compress) {
 			ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
 			struct timespec start_time;
 			struct timespec end_time;
 			uint64_t compress_time;
 
 			clock_gettime(CLOCK_MONOTONIC, &start_time);
 			err = decompress(knet_h, inbuf->khp_data_compress,
 					 (const unsigned char *)inbuf->khp_data_userdata,
 					 len - KNET_HEADER_DATA_SIZE,
 					 knet_h->recv_from_links_buf_decompress,
 					 &decmp_outlen);
 
 			stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
 			if (stats_err < 0) {
 				pthread_mutex_unlock(&src_link->link_stats_mutex);
 				log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
 				return;
 			}
 
 			clock_gettime(CLOCK_MONOTONIC, &end_time);
 			timespec_diff(start_time, end_time, &compress_time);
 
 			if (!err) {
 				/* Collect stats */
 				if (compress_time < knet_h->stats.rx_compress_time_min) {
 					knet_h->stats.rx_compress_time_min = compress_time;
 				}
 				if (compress_time > knet_h->stats.rx_compress_time_max) {
 					knet_h->stats.rx_compress_time_max = compress_time;
 				}
 				knet_h->stats.rx_compress_time_ave =
 					(knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets +
 					 compress_time) / (knet_h->stats.rx_compressed_packets+1);
 
 				knet_h->stats.rx_compressed_packets++;
 				knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
 				knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE;
 
 				memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
 				len = decmp_outlen + KNET_HEADER_DATA_SIZE;
 			} else {
 				pthread_mutex_unlock(&knet_h->handle_stats_mutex);
 				pthread_mutex_unlock(&src_link->link_stats_mutex);
 				log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
 					 err, strerror(errno));
 				return;
 			}
 			pthread_mutex_unlock(&knet_h->handle_stats_mutex);
 		}
 
 		if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
 			if (decrypted) {
 				stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
 				if (stats_err < 0) {
 					pthread_mutex_unlock(&src_link->link_stats_mutex);
 					log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
 					return;
 				}
 				/* Only update the crypto overhead for data packets. Mainly to be
 				   consistent with TX */
 				if (decrypt_time < knet_h->stats.rx_crypt_time_min) {
 					knet_h->stats.rx_crypt_time_min = decrypt_time;
 				}
 				if (decrypt_time > knet_h->stats.rx_crypt_time_max) {
 					knet_h->stats.rx_crypt_time_max = decrypt_time;
 				}
 				knet_h->stats.rx_crypt_time_ave =
 					(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
 					 decrypt_time) / (knet_h->stats.rx_crypt_packets+1);
 				knet_h->stats.rx_crypt_packets++;
 				pthread_mutex_unlock(&knet_h->handle_stats_mutex);
 			}
 
 			if (knet_h->enabled != 1) /* data forward is disabled */
 				break;
 
 			if (knet_h->dst_host_filter_fn) {
 				size_t host_idx;
 				int found = 0;
 
 				bcast = knet_h->dst_host_filter_fn(
 						knet_h->dst_host_filter_fn_private_data,
 						(const unsigned char *)inbuf->khp_data_userdata,
 						len - KNET_HEADER_DATA_SIZE,
 						KNET_NOTIFY_RX,
 						knet_h->host_id,
 						inbuf->kh_node,
 						&channel,
 						dst_host_ids,
 						&dst_host_ids_entries);
 				if (bcast < 0) {
 					pthread_mutex_unlock(&src_link->link_stats_mutex);
 					log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
 					return;
 				}
 
 				if ((!bcast) && (!dst_host_ids_entries)) {
 					pthread_mutex_unlock(&src_link->link_stats_mutex);
 					log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
 					return;
 				}
 
 				/* check if we are dst for this packet */
 				if (!bcast) {
 					if (dst_host_ids_entries > KNET_MAX_HOST) {
 						pthread_mutex_unlock(&src_link->link_stats_mutex);
 						log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
 						return;
 					}
 					for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
 						if (dst_host_ids[host_idx] == knet_h->host_id) {
 							found = 1;
 							break;
 						}
 					}
 					if (!found) {
 						pthread_mutex_unlock(&src_link->link_stats_mutex);
 						log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
 						return;
 					}
 				}
 			}
 		}
 
 		if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
 			if (!knet_h->sockfd[channel].in_use) {
 				pthread_mutex_unlock(&src_link->link_stats_mutex);
 				log_debug(knet_h, KNET_SUB_RX,
 					  "received packet for channel %d but there is no local sock connected",
 					  channel);
 				return;
 			}
 
 			outlen = 0;
 			memset(iov_out, 0, sizeof(iov_out));
 
 retry:
 			iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
 			iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
 
 			outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
 			if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
 				log_debug(knet_h, KNET_SUB_RX,
 					  "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
 					  iov_out[0].iov_len, outlen);
 				goto retry;
 			}
 
 			if (outlen <= 0) {
 				knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
 						       knet_h->sockfd[channel].sockfd[0],
 						       channel,
 						       KNET_NOTIFY_RX,
 						       outlen,
 						       errno);
 				pthread_mutex_unlock(&src_link->link_stats_mutex);
 				return;
 			}
 			if ((size_t)outlen == iov_out[0].iov_len) {
 				_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
 			}
 		} else { /* HOSTINFO */
 			knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
 			if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
 				knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
 			}
 			if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
 				pthread_mutex_unlock(&src_link->link_stats_mutex);
 				return;
 			}
 			_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
 			switch(knet_hostinfo->khi_type) {
 				case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
 					break;
 				case KNET_HOSTINFO_TYPE_LINK_TABLE:
 					break;
 				default:
 					log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
 					break;
 			}
 		}
 		break;
 	case KNET_HEADER_TYPE_PING:
 		outlen = KNET_HEADER_PING_SIZE;
 		inbuf->kh_type = KNET_HEADER_TYPE_PONG;
 		inbuf->kh_node = htons(knet_h->host_id);
 		recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
 		src_link->status.stats.rx_ping_packets++;
 		src_link->status.stats.rx_ping_bytes += len;
 
 		wipe_bufs = 0;
 
 		if (!inbuf->khp_ping_timed) {
 			/*
 			 * we might be receiving this message from all links, but we want
 			 * to process it only the first time
 			 */
 			if (recv_seq_num != src_host->untimed_rx_seq_num) {
 				/*
 				 * cache the untimed seq num
 				 */
 				src_host->untimed_rx_seq_num = recv_seq_num;
 				/*
 				 * if the host has received data in between
 				 * untimed ping, then we don't need to wipe the bufs
 				 */
 				if (src_host->got_data) {
 					src_host->got_data = 0;
 					wipe_bufs = 0;
 				} else {
 					wipe_bufs = 1;
 				}
 			}
 			_seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
 		} else {
 			/*
 			 * pings always arrives in bursts over all the link
 			 * catch the first of them to cache the seq num and
 			 * avoid duplicate processing
 			 */
 			if (recv_seq_num != src_host->timed_rx_seq_num) {
 				src_host->timed_rx_seq_num = recv_seq_num;
 
 				if (recv_seq_num == 0) {
 					_seq_num_lookup(src_host, recv_seq_num, 0, 1);
 				}
 			}
 		}
 
 		if (knet_h->crypto_in_use_config) {
 			if (crypto_encrypt_and_sign(knet_h,
 						    (const unsigned char *)inbuf,
 						    outlen,
 						    knet_h->recv_from_links_buf_crypt,
 						    &outlen) < 0) {
 				log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
 				break;
 			}
 			outbuf = knet_h->recv_from_links_buf_crypt;
 			stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
 			if (stats_err < 0) {
 				log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
 				break;
 			}
 			knet_h->stats_extra.tx_crypt_pong_packets++;
 			pthread_mutex_unlock(&knet_h->handle_stats_mutex);
 		}
 
 retry_pong:
 		if (src_link->transport_connected) {
 			if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
 				len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
 					     (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
 			} else {
 				len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
 			}
 			savederrno = errno;
 			if (len != outlen) {
 				err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
 				switch(err) {
 					case -1: /* unrecoverable error */
 						log_debug(knet_h, KNET_SUB_RX,
 							  "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
 							  src_link->outsock, errno, strerror(errno),
 							  src_link->status.src_ipaddr, src_link->status.src_port,
 							  src_link->status.dst_ipaddr, src_link->status.dst_port);
 						src_link->status.stats.tx_pong_errors++;
 						break;
 					case 0: /* ignore error and continue */
 						break;
 					case 1: /* retry to send those same data */
 						src_link->status.stats.tx_pong_retries++;
 						goto retry_pong;
 						break;
 				}
 			}
 			src_link->status.stats.tx_pong_packets++;
 			src_link->status.stats.tx_pong_bytes += outlen;
 		}
 		break;
 	case KNET_HEADER_TYPE_PONG:
 		src_link->status.stats.rx_pong_packets++;
 		src_link->status.stats.rx_pong_bytes += len;
 		clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
 
 		memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
 		timespec_diff(recvtime,
 				src_link->status.pong_last, &latency_last);
 
 		if ((latency_last / 1000llu) > src_link->pong_timeout) {
 			log_debug(knet_h, KNET_SUB_RX,
 				  "Incoming pong packet from host: %u link: %u has higher latency than pong_timeout. Discarding",
 				  src_host->host_id, src_link->link_id);
 		} else {
 
 			/*
 			 * in words : ('previous mean' * '(count -1)') + 'new value') / 'count'
 			 */
 
 			src_link->latency_cur_samples++;
 
 			/*
 			 * limit to max_samples (precision)
 			 */
 			if (src_link->latency_cur_samples >= src_link->latency_max_samples) {
 				src_link->latency_cur_samples = src_link->latency_max_samples;
 			}
 			src_link->status.latency =
 				(((src_link->status.latency * (src_link->latency_cur_samples - 1)) + (latency_last / 1000llu)) / src_link->latency_cur_samples);
 
 			if (src_link->status.latency < src_link->pong_timeout_adj) {
 				if (!src_link->status.connected) {
 					if (src_link->received_pong >= src_link->pong_count) {
 						log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
 							 src_host->host_id, src_link->link_id);
 						_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0);
 					} else {
 						src_link->received_pong++;
 						log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
 							  src_host->host_id, src_link->link_id, src_link->received_pong);
 					}
 				}
 			}
 			/* Calculate latency stats */
 			if (src_link->status.latency > src_link->status.stats.latency_max) {
 				src_link->status.stats.latency_max = src_link->status.latency;
 			}
 			if (src_link->status.latency < src_link->status.stats.latency_min) {
 				src_link->status.stats.latency_min = src_link->status.latency;
 			}
 
 			/*
 			 * those 2 lines below make all latency average calculations consistent and capped to
 			 * link precision. In future we will kill the one above to keep only this one in
 			 * the stats structure, but for now we leave it around to avoid API/ABI
 			 * breakage as we backport the fixes to stable
 			 */
 			src_link->status.stats.latency_ave = src_link->status.latency;
 			src_link->status.stats.latency_samples = src_link->latency_cur_samples;
 		}
 		break;
 	case KNET_HEADER_TYPE_PMTUD:
 		src_link->status.stats.rx_pmtu_packets++;
 		src_link->status.stats.rx_pmtu_bytes += len;
 		outlen = KNET_HEADER_PMTUD_SIZE;
 		inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
 		inbuf->kh_node = htons(knet_h->host_id);
 
 		if (knet_h->crypto_in_use_config) {
 			if (crypto_encrypt_and_sign(knet_h,
 						    (const unsigned char *)inbuf,
 						    outlen,
 						    knet_h->recv_from_links_buf_crypt,
 						    &outlen) < 0) {
 				log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
 				break;
 			}
 			outbuf = knet_h->recv_from_links_buf_crypt;
 			stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
 			if (stats_err < 0) {
 				log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
 				break;
 			}
 			knet_h->stats_extra.tx_crypt_pmtu_reply_packets++;
 			pthread_mutex_unlock(&knet_h->handle_stats_mutex);
 		}
 
 		/* Unlock so we don't deadlock with tx_mutex */
 		pthread_mutex_unlock(&src_link->link_stats_mutex);
 
 		savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
 		if (savederrno) {
 			log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
 			goto out_pmtud;
 		}
 retry_pmtud:
 		if (src_link->transport_connected) {
 			if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
 				len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
 					     (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
 			} else {
 				len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
 			}
 			savederrno = errno;
 			if (len != outlen) {
 				err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
 				stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
 				if (stats_err < 0) {
 					log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
 					break;
 				}
 				switch(err) {
 					case -1: /* unrecoverable error */
 						log_debug(knet_h, KNET_SUB_RX,
 							  "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
 							  src_link->outsock, errno, strerror(errno),
 							  src_link->status.src_ipaddr, src_link->status.src_port,
 							  src_link->status.dst_ipaddr, src_link->status.dst_port);
 
 						src_link->status.stats.tx_pmtu_errors++;
 						break;
 					case 0: /* ignore error and continue */
 						src_link->status.stats.tx_pmtu_errors++;
 						break;
 					case 1: /* retry to send those same data */
 						src_link->status.stats.tx_pmtu_retries++;
 						pthread_mutex_unlock(&src_link->link_stats_mutex);
 						goto retry_pmtud;
 						break;
 				}
 				pthread_mutex_unlock(&src_link->link_stats_mutex);
 			}
 		}
 		pthread_mutex_unlock(&knet_h->tx_mutex);
 out_pmtud:
 		return; /* Don't need to unlock link_stats_mutex */
 	case KNET_HEADER_TYPE_PMTUD_REPLY:
 		src_link->status.stats.rx_pmtu_packets++;
 		src_link->status.stats.rx_pmtu_bytes += len;
 
 		/* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
 		pthread_mutex_unlock(&src_link->link_stats_mutex);
 
 		if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
 			log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
 			break;
 		}
 		src_link->last_recv_mtu = inbuf->khp_pmtud_size;
 		pthread_cond_signal(&knet_h->pmtud_cond);
 		pthread_mutex_unlock(&knet_h->pmtud_mutex);
 		return;
 	default:
 		pthread_mutex_unlock(&src_link->link_stats_mutex);
 		return;
 	}
 	pthread_mutex_unlock(&src_link->link_stats_mutex);
 }
 
 static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
 {
 	int err, savederrno;
 	int i, msg_recv, transport;
 
 	if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
 		log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
 		return;
 	}
 
 	if (_is_valid_fd(knet_h, sockfd) < 1) {
 		/*
 		 * this is normal if a fd got an event and before we grab the read lock
 		 * and the link is removed by another thread
 		 */
 		goto exit_unlock;
 	}
 
 	transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
 
 	/*
 	 * reset msg_namelen to buffer size because after recvmmsg
 	 * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
 	 */
 
 	for (i = 0; i < PCKT_RX_BUFS; i++) {
 		msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
 	}
 
 	msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
 	savederrno = errno;
 
 	/*
 	 * WARNING: man page for recvmmsg is wrong. Kernel implementation here:
 	 * recvmmsg can return:
 	 * -1 on error
 	 *  0 if the previous run of recvmmsg recorded an error on the socket
 	 *  N number of messages (see exception below).
 	 *
 	 * If there is an error from recvmsg after receiving a frame or more, the recvmmsg
 	 * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
 	 * it will be visibile in the next run.
 	 *
 	 * Need to be careful how we handle errors at this stage.
 	 *
 	 * error messages need to be handled on a per transport/protocol base
 	 * at this point we have different layers of error handling
 	 * - msg_recv < 0 -> error from this run
 	 *   msg_recv = 0 -> error from previous run and error on socket needs to be cleared
 	 * - per-transport message data
 	 *   example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
 	 *            but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
 	 * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
 	 *         errno set. That means the error api needs to be able to abort the loop below.
 	 */
 
 	if (msg_recv <= 0) {
 		transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno);
 		goto exit_unlock;
 	}
 
 	for (i = 0; i < msg_recv; i++) {
 		err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]);
 
 		/*
 		 * TODO: make this section silent once we are confident
 		 *       all protocols packet handlers are good
 		 */
 
 		switch(err) {
 			case KNET_TRANSPORT_RX_ERROR: /* on error */
 				log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
 				goto exit_unlock;
 				break;
 			case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */
 				log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
 				break;
 			case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */
 				log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
 				goto exit_unlock;
 				break;
 			case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */
 				/*
 				 * processing incoming packets vs access lists
 				 */
 				if ((knet_h->use_access_lists) &&
 				    (transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) {
 					if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) {
 						char src_ipaddr[KNET_MAX_HOST_LEN];
 						char src_port[KNET_MAX_PORT_LEN];
 
 						memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
 						memset(src_port, 0, KNET_MAX_PORT_LEN);
 						if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name),
 								   src_ipaddr, KNET_MAX_HOST_LEN,
 								   src_port, KNET_MAX_PORT_LEN) < 0) {
 
 							log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port");
 						} else {
 							log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port);
 						}
 						/*
 						 * continue processing the other packets
 						 */
 						continue;
 					}
 				}
 				_parse_recv_from_links(knet_h, sockfd, &msg[i]);
 				break;
 			case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE:
 				log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue");
 				break;
 			case KNET_TRANSPORT_RX_OOB_DATA_STOP:
 				log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop");
 				goto exit_unlock;
 				break;
 		}
 	}
 
 exit_unlock:
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 }
 
 void *_handle_recv_from_links_thread(void *data)
 {
 	int i, nev;
 	knet_handle_t knet_h = (knet_handle_t) data;
 	struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
 	struct sockaddr_storage address[PCKT_RX_BUFS];
 	struct knet_mmsghdr msg[PCKT_RX_BUFS];
 	struct iovec iov_in[PCKT_RX_BUFS];
 
 	set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED);
 
 	memset(&msg, 0, sizeof(msg));
+	memset(&events, 0, sizeof(events));
 
 	for (i = 0; i < PCKT_RX_BUFS; i++) {
 		iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
 		iov_in[i].iov_len = KNET_DATABUFSIZE;
 
 		memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
 
 		msg[i].msg_hdr.msg_name = &address[i];
 		msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
 		msg[i].msg_hdr.msg_iov = &iov_in[i];
 		msg[i].msg_hdr.msg_iovlen = 1;
 	}
 
 	while (!shutdown_in_progress(knet_h)) {
 		nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
 
 		/*
 		 * the RX threads only need to notify that there has been at least
 		 * one successful run after queue flush has been requested.
 		 * See setfwd in handle.c
 		 */
 		if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) {
 			set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
 		}
 
 		/*
 		 * we use timeout to detect if thread is shutting down
 		 */
 		if (nev == 0) {
 			continue;
 		}
 
 		for (i = 0; i < nev; i++) {
 			_handle_recv_from_links(knet_h, events[i].data.fd, msg);
 		}
 	}
 
 	set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED);
 
 	return NULL;
 }
 
 ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
 {
 	int savederrno = 0;
 	ssize_t err = 0;
 	struct iovec iov_in;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff == NULL) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff_len <= 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff_len > KNET_MAX_PACKET_SIZE) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel < 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel >= KNET_DATAFD_MAX) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	if (!knet_h->sockfd[channel].in_use) {
 		savederrno = EINVAL;
 		err = -1;
 		goto out_unlock;
 	}
 
 	memset(&iov_in, 0, sizeof(iov_in));
 	iov_in.iov_base = (void *)buff;
 	iov_in.iov_len = buff_len;
 
 	err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
 	savederrno = errno;
 
 out_unlock:
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 	errno = err ? savederrno : 0;
 	return err;
 }
diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c
index 852064c2..615c4266 100644
--- a/libknet/threads_tx.c
+++ b/libknet/threads_tx.c
@@ -1,901 +1,902 @@
 /*
  * Copyright (C) 2012-2020 Red Hat, Inc.  All rights reserved.
  *
  * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
  *          Federico Simoncelli <fsimon@kronosnet.org>
  *
  * This software licensed under LGPL-2.0+
  */
 
 #include "config.h"
 
 #include <math.h>
 #include <string.h>
 #include <pthread.h>
 #include <unistd.h>
 #include <sys/uio.h>
 #include <errno.h>
 
 #include "compat.h"
 #include "compress.h"
 #include "crypto.h"
 #include "host.h"
 #include "link.h"
 #include "logging.h"
 #include "transports.h"
 #include "transport_common.h"
 #include "threads_common.h"
 #include "threads_heartbeat.h"
 #include "threads_tx.h"
 #include "netutils.h"
 
 /*
  * SEND
  */
 
 static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
 {
 	int link_idx, msg_idx, sent_msgs, prev_sent, progress;
 	int err = 0, savederrno = 0, locked = 0;
 	unsigned int i;
 	struct knet_mmsghdr *cur;
 	struct knet_link *cur_link;
 
 	for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
 		prev_sent = 0;
 		progress = 1;
 		locked = 0;
 
 		cur_link = &dst_host->link[dst_host->active_links[link_idx]];
 
 		if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) {
 			continue;
 		}
 
 		savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex);
 		if (savederrno) {
 			log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s",
 				dst_host->host_id, cur_link->link_id, strerror(savederrno));
 			continue;
 		}
 		locked = 1;
 
 		msg_idx = 0;
 		while (msg_idx < msgs_to_send) {
 			msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr;
 
 			/* Cast for Linux/BSD compatibility */
 			for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) {
 				cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len;
 			}
 			cur_link->status.stats.tx_data_packets++;
 			msg_idx++;
 		}
 
 retry:
 		cur = &msg[prev_sent];
 
 		sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
 				      transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport),
 				      &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
 		savederrno = errno;
 
 		err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
 		switch(err) {
 			case -1: /* unrecoverable error */
 				cur_link->status.stats.tx_data_errors++;
 				goto out_unlock;
 				break;
 			case 0: /* ignore error and continue */
 				break;
 			case 1: /* retry to send those same data */
 				cur_link->status.stats.tx_data_retries++;
 				goto retry;
 				break;
 		}
 
 		prev_sent = prev_sent + sent_msgs;
 
 		if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) {
 			if ((sent_msgs) || (progress)) {
 				if (sent_msgs) {
 					progress = 1;
 				} else {
 					progress = 0;
 				}
 #ifdef DEBUG
 				log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
 					  sent_msgs, msg_idx,
 					  dst_host->name, dst_host->host_id,
 					  dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
 					  dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
 					  dst_host->link[dst_host->active_links[link_idx]].link_id);
 #endif
 				goto retry;
 			}
 			if (!progress) {
 				savederrno = EAGAIN;
 				err = -1;
 				goto out_unlock;
 			}
 		}
 
 		if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
 		    (dst_host->active_link_entries > 1)) {
 			uint8_t cur_link_id = dst_host->active_links[0];
 
 			memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
 			dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
 
 			break;
 		}
 		pthread_mutex_unlock(&cur_link->link_stats_mutex);
 		locked = 0;
 	}
 
 out_unlock:
 	if (locked) {
 		pthread_mutex_unlock(&cur_link->link_stats_mutex);
 	}
 	errno = savederrno;
 	return err;
 }
 
 static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync)
 {
 	size_t outlen, frag_len;
 	struct knet_host *dst_host;
 	knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST];
 	size_t dst_host_ids_entries_temp = 0;
 	knet_node_id_t dst_host_ids[KNET_MAX_HOST];
 	size_t dst_host_ids_entries = 0;
 	int bcast = 1;
 	struct knet_hostinfo *knet_hostinfo;
 	struct iovec iov_out[PCKT_FRAG_MAX][2];
 	int iovcnt_out = 2;
 	uint8_t frag_idx;
 	unsigned int temp_data_mtu;
 	size_t host_idx;
 	int send_mcast = 0;
 	struct knet_header *inbuf;
 	int savederrno = 0;
 	int err = 0;
 	seq_num_t tx_seq_num;
 	struct knet_mmsghdr msg[PCKT_FRAG_MAX];
 	int msgs_to_send, msg_idx;
 	unsigned int i;
 	int j;
 	int send_local = 0;
 	int data_compressed = 0;
 	size_t uncrypted_frag_size;
 	int stats_locked = 0, stats_err = 0;
 
 	inbuf = knet_h->recv_from_sock_buf;
 
 	if ((knet_h->enabled != 1) &&
 	    (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
 		log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
 		savederrno = ECANCELED;
 		err = -1;
 		goto out_unlock;
 	}
 
 	/*
 	 * move this into a separate function to expand on
 	 * extra switching rules
 	 */
 	switch(inbuf->kh_type) {
 		case KNET_HEADER_TYPE_DATA:
 			if (knet_h->dst_host_filter_fn) {
 				bcast = knet_h->dst_host_filter_fn(
 						knet_h->dst_host_filter_fn_private_data,
 						(const unsigned char *)inbuf->khp_data_userdata,
 						inlen,
 						KNET_NOTIFY_TX,
 						knet_h->host_id,
 						knet_h->host_id,
 						&channel,
 						dst_host_ids_temp,
 						&dst_host_ids_entries_temp);
 				if (bcast < 0) {
 					log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
 					savederrno = EFAULT;
 					err = -1;
 					goto out_unlock;
 				}
 
 				if ((!bcast) && (!dst_host_ids_entries_temp)) {
 					log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
 					savederrno = EINVAL;
 					err = -1;
 					goto out_unlock;
 				}
 
 				if ((!bcast) &&
 				    (dst_host_ids_entries_temp > KNET_MAX_HOST)) {
 					log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations");
 					savederrno = EINVAL;
 					err = -1;
 					goto out_unlock;
 				}
 			}
 
 			/* Send to localhost if appropriate and enabled */
 			if (knet_h->has_loop_link) {
 				send_local = 0;
 				if (bcast) {
 					send_local = 1;
 				} else {
 					for (i=0; i< dst_host_ids_entries_temp; i++) {
 						if (dst_host_ids_temp[i] == knet_h->host_id) {
 							send_local = 1;
 						}
 					}
 				}
 				if (send_local) {
 					const unsigned char *buf = inbuf->khp_data_userdata;
 					ssize_t buflen = inlen;
 					struct knet_link *local_link;
 
 					local_link = knet_h->host_index[knet_h->host_id]->link;
 
 				local_retry:
 					err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen);
 					if (err < 0) {
 						log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno));
 						local_link->status.stats.tx_data_errors++;
 					}
 					if (err > 0 && err < buflen) {
 						log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen);
 						local_link->status.stats.tx_data_retries++;
 						buf += err;
 						buflen -= err;
 						goto local_retry;
 					}
 					if (err == buflen) {
 						local_link->status.stats.tx_data_packets++;
 						local_link->status.stats.tx_data_bytes += inlen;
 					}
 				}
 			}
 			break;
 		case KNET_HEADER_TYPE_HOST_INFO:
 			knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
 			if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
 				bcast = 0;
 				dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
 				dst_host_ids_entries_temp = 1;
 				knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
 			}
 			break;
 		default:
 			log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
 			savederrno = ENOMSG;
 			err = -1;
 			goto out_unlock;
 			break;
 	}
 
 	if (is_sync) {
 		if ((bcast) ||
 		    ((!bcast) && (dst_host_ids_entries_temp > 1))) {
 			log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
 			savederrno = E2BIG;
 			err = -1;
 			goto out_unlock;
 		}
 	}
 
 	/*
 	 * check destinations hosts before spending time
 	 * in fragmenting/encrypting packets to save
 	 * time processing data for unreachable hosts.
 	 * for unicast, also remap the destination data
 	 * to skip unreachable hosts.
 	 */
 
 	if (!bcast) {
 		dst_host_ids_entries = 0;
 		for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
 			dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
 			if (!dst_host) {
 				continue;
 			}
 			if (!(dst_host->host_id == knet_h->host_id &&
 			     knet_h->has_loop_link) &&
 			    dst_host->status.reachable) {
 				dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
 				dst_host_ids_entries++;
 			}
 		}
 		if (!dst_host_ids_entries) {
 			savederrno = EHOSTDOWN;
 			err = -1;
 			goto out_unlock;
 		}
 	} else {
 		send_mcast = 0;
 		for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
 			if (!(dst_host->host_id == knet_h->host_id &&
 			      knet_h->has_loop_link) &&
 			    dst_host->status.reachable) {
 				send_mcast = 1;
 				break;
 			}
 		}
 		if (!send_mcast) {
 			savederrno = EHOSTDOWN;
 			err = -1;
 			goto out_unlock;
 		}
 	}
 
 	if (!knet_h->data_mtu) {
 		/*
 		 * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
 		 */
 		log_debug(knet_h, KNET_SUB_TX,
 			  "Received data packet but data MTU is still unknown."
 			  " Packet might not be delivered."
 			  " Assuming minimum IPv4 MTU (%d)",
 			  KNET_PMTUD_MIN_MTU_V4);
 		temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
 	} else {
 		/*
 		 * take a copy of the mtu to avoid value changing under
 		 * our feet while we are sending a fragmented pckt
 		 */
 		temp_data_mtu = knet_h->data_mtu;
 	}
 
 	/*
 	 * compress data
 	 */
 	if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) {
 		size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS;
 		struct timespec start_time;
 		struct timespec end_time;
 		uint64_t compress_time;
 
 		clock_gettime(CLOCK_MONOTONIC, &start_time);
 		err = compress(knet_h,
 			       (const unsigned char *)inbuf->khp_data_userdata, inlen,
 			       knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen);
 
 		savederrno = errno;
 
 		stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
 		if (stats_err < 0) {
 			log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
 			err = -1;
 			savederrno = stats_err;
 			goto out_unlock;
 		}
 		stats_locked = 1;
 		/* Collect stats */
 		clock_gettime(CLOCK_MONOTONIC, &end_time);
 		timespec_diff(start_time, end_time, &compress_time);
 
 		if (compress_time < knet_h->stats.tx_compress_time_min) {
 			knet_h->stats.tx_compress_time_min = compress_time;
 		}
 		if (compress_time > knet_h->stats.tx_compress_time_max) {
 			knet_h->stats.tx_compress_time_max = compress_time;
 		}
 		knet_h->stats.tx_compress_time_ave =
 			(unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets +
 			 compress_time) / (knet_h->stats.tx_compressed_packets+1);
 		if (err < 0) {
 			log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno));
 		} else {
 			knet_h->stats.tx_compressed_packets++;
 			knet_h->stats.tx_compressed_original_bytes += inlen;
 			knet_h->stats.tx_compressed_size_bytes += cmp_outlen;
 
 			if (cmp_outlen < inlen) {
 				memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen);
 				inlen = cmp_outlen;
 				data_compressed = 1;
 			}
 		}
 	}
 	if (!stats_locked) {
 		stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
 		if (stats_err < 0) {
 			log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
 			err = -1;
 			savederrno = stats_err;
 			goto out_unlock;
 		}
 	}
 	if (knet_h->compress_model > 0 && !data_compressed) {
 		knet_h->stats.tx_uncompressed_packets++;
 	}
 	pthread_mutex_unlock(&knet_h->handle_stats_mutex);
 	stats_locked = 0;
 
 	/*
 	 * prepare the outgoing buffers
 	 */
 
 	frag_len = inlen;
 	frag_idx = 0;
 
 	inbuf->khp_data_bcast = bcast;
 	inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
 	inbuf->khp_data_channel = channel;
 	if (data_compressed) {
 		inbuf->khp_data_compress = knet_h->compress_model;
 	} else {
 		inbuf->khp_data_compress = 0;
 	}
 
 	if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
 		log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
 		goto out_unlock;
 	}
 	knet_h->tx_seq_num++;
 	/*
 	 * force seq_num 0 to detect a node that has crashed and rejoining
 	 * the knet instance. seq_num 0 will clear the buffers in the RX
 	 * thread
 	 */
 	if (knet_h->tx_seq_num == 0) {
 		knet_h->tx_seq_num++;
 	}
 	/*
 	 * cache the value in locked context
 	 */
 	tx_seq_num = knet_h->tx_seq_num;
 	inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num);
 	pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
 
 	/*
 	 * forcefully broadcast a ping to all nodes every SEQ_MAX / 8
 	 * pckts.
 	 * this solves 2 problems:
 	 * 1) on TX socket overloads we generate extra pings to keep links alive
 	 * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
 	 *    node 3+ will be able to keep in sync on the TX seq_num even without
 	 *    receiving traffic or pings in betweens. This avoids issues with
 	 *    rollover of the circular buffer
 	 */
 
 	if (tx_seq_num % (SEQ_MAX / 8) == 0) {
 		_send_pings(knet_h, 0);
 	}
 
 	if (inbuf->khp_data_frag_num > 1) {
 		while (frag_idx < inbuf->khp_data_frag_num) {
 			/*
 			 * set the iov_base
 			 */
 			iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
 			iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE;
 			iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx);
 
 			/*
 			 * set the len
 			 */
 			if (frag_len > temp_data_mtu) {
 				iov_out[frag_idx][1].iov_len = temp_data_mtu;
 			} else {
 				iov_out[frag_idx][1].iov_len = frag_len;
 			}
 
 			/*
 			 * copy the frag info on all buffers
 			 */
 			knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
 			knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num;
 			knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
 			knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
 			knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
 			knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress;
 
 			frag_len = frag_len - temp_data_mtu;
 			frag_idx++;
 		}
 		iovcnt_out = 2;
 	} else {
 		iov_out[frag_idx][0].iov_base = (void *)inbuf;
 		iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
 		iovcnt_out = 1;
 	}
 
 	if (knet_h->crypto_in_use_config) {
 		struct timespec start_time;
 		struct timespec end_time;
 		uint64_t crypt_time;
 
 		frag_idx = 0;
 		while (frag_idx < inbuf->khp_data_frag_num) {
 			clock_gettime(CLOCK_MONOTONIC, &start_time);
 			if (crypto_encrypt_and_signv(
 					knet_h,
 					iov_out[frag_idx], iovcnt_out,
 					knet_h->send_to_links_buf_crypt[frag_idx],
 					(ssize_t *)&outlen) < 0) {
 				log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet");
 				savederrno = ECHILD;
 				err = -1;
 				goto out_unlock;
 			}
 			clock_gettime(CLOCK_MONOTONIC, &end_time);
 			timespec_diff(start_time, end_time, &crypt_time);
 
 			stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
 			if (stats_err < 0) {
 				log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
 				err = -1;
 				savederrno = stats_err;
 				goto out_unlock;
 			}
 
 			if (crypt_time < knet_h->stats.tx_crypt_time_min) {
 				knet_h->stats.tx_crypt_time_min = crypt_time;
 			}
 			if (crypt_time > knet_h->stats.tx_crypt_time_max) {
 				knet_h->stats.tx_crypt_time_max = crypt_time;
 			}
 			knet_h->stats.tx_crypt_time_ave =
 				(knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets +
 				 crypt_time) / (knet_h->stats.tx_crypt_packets+1);
 
 			uncrypted_frag_size = 0;
 			for (j=0; j < iovcnt_out; j++) {
 				uncrypted_frag_size += iov_out[frag_idx][j].iov_len;
 			}
 			knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size);
 			knet_h->stats.tx_crypt_packets++;
 			pthread_mutex_unlock(&knet_h->handle_stats_mutex);
 
 			iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
 			iov_out[frag_idx][0].iov_len = outlen;
 			frag_idx++;
 		}
 		iovcnt_out = 1;
 	}
 
 	memset(&msg, 0, sizeof(msg));
 
 	msgs_to_send = inbuf->khp_data_frag_num;
 
 	msg_idx = 0;
 
 	while (msg_idx < msgs_to_send) {
 		msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
 		msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0];
 		msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out;
 		msg_idx++;
 	}
 
 	if (!bcast) {
 		for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
 			dst_host = knet_h->host_index[dst_host_ids[host_idx]];
 
 			err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
 			savederrno = errno;
 			if (err) {
 				goto out_unlock;
 			}
 		}
 	} else {
 		for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
 			if (dst_host->status.reachable) {
 				err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
 				savederrno = errno;
 				if (err) {
 					goto out_unlock;
 				}
 			}
 		}
 	}
 
 out_unlock:
 	errno = savederrno;
 	return err;
 }
 
 static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type)
 {
 	ssize_t inlen = 0;
 	int savederrno = 0, docallback = 0;
 
 	if ((channel >= 0) &&
 	    (channel < KNET_DATAFD_MAX) &&
 	    (!knet_h->sockfd[channel].is_socket)) {
 		inlen = readv(sockfd, msg->msg_iov, 1);
 	} else {
 		inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL);
 		if (msg->msg_flags & MSG_TRUNC) {
 			log_warn(knet_h, KNET_SUB_TX, "Received truncated message from sock %d. Discarding", sockfd);
 			return;
 		}
 	}
 
 	if (inlen == 0) {
 		savederrno = 0;
 		docallback = 1;
 	} else if (inlen < 0) {
 		struct epoll_event ev;
 
 		savederrno = errno;
 		docallback = 1;
 		memset(&ev, 0, sizeof(struct epoll_event));
 
 		if (channel != KNET_INTERNAL_DATA_CHANNEL) {
 			if (epoll_ctl(knet_h->send_to_links_epollfd,
 				      EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
 				log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
 					knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
 			} else {
 				knet_h->sockfd[channel].has_error = 1;
 			}
 		}
 		/*
 		 * TODO: add error handling for KNET_INTERNAL_DATA_CHANNEL
 		 *       once we add support for internal knet communication
 		 */
 	} else {
 		knet_h->recv_from_sock_buf->kh_type = type;
 		_parse_recv_from_sock(knet_h, inlen, channel, 0);
 	}
 
 	if ((docallback) && (channel != KNET_INTERNAL_DATA_CHANNEL)) {
 		knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
 				       knet_h->sockfd[channel].sockfd[0],
 				       channel,
 				       KNET_NOTIFY_TX,
 				       inlen,
 				       savederrno);
 	}
 }
 
 void *_handle_send_to_links_thread(void *data)
 {
 	knet_handle_t knet_h = (knet_handle_t) data;
 	struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
 	int i, nev, type;
 	int flush, flush_queue_limit;
 	int8_t channel;
 	struct iovec iov_in;
 	struct msghdr msg;
 	struct sockaddr_storage address;
 
 	set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED);
 
+	memset(&events, 0, sizeof(events));
 	memset(&iov_in, 0, sizeof(iov_in));
 	iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata;
 	iov_in.iov_len = KNET_MAX_PACKET_SIZE;
 
 	memset(&msg, 0, sizeof(struct msghdr));
 	msg.msg_name = &address;
 	msg.msg_namelen = sizeof(struct sockaddr_storage);
 	msg.msg_iov = &iov_in;
 	msg.msg_iovlen = 1;
 
 	knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION;
 	knet_h->recv_from_sock_buf->khp_data_frag_seq = 0;
 	knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id);
 
 	for (i = 0; i < PCKT_FRAG_MAX; i++) {
 		knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
 		knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
 		knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
 	}
 
 	flush_queue_limit = 0;
 
 	while (!shutdown_in_progress(knet_h)) {
 		nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, KNET_THREADS_TIMERES / 1000);
 
 		flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX);
 
 		/*
 		 * we use timeout to detect if thread is shutting down
 		 */
 		if (nev == 0) {
 			/*
 			 * ideally we want to communicate that we are done flushing
 			 * the queue when we have an epoll timeout event
 			 */
 			if (flush == KNET_THREAD_QUEUE_FLUSH) {
 				set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
 				flush_queue_limit = 0;
 			}
 			continue;
 		}
 
 		/*
 		 * fall back in case the TX sockets will continue receive traffic
 		 * and we do not hit an epoll timeout.
 		 *
 		 * allow up to a 100 loops to flush queues, then we give up.
 		 * there might be more clean ways to do it by checking the buffer queue
 		 * on each socket, but we have tons of sockets and calculations can go wrong.
 		 * Also, why would you disable data forwarding and still send packets?
 		 */
 		if (flush == KNET_THREAD_QUEUE_FLUSH) {
 			if (flush_queue_limit >= 100) {
 				log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss");
 				set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
 				flush_queue_limit = 0;
 			} else {
 				flush_queue_limit++;
 			}
 		} else {
 			flush_queue_limit = 0;
 		}
 
 		if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
 			log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
 			continue;
 		}
 
 		for (i = 0; i < nev; i++) {
 			if (events[i].data.fd == knet_h->hostsockfd[0]) {
 				type = KNET_HEADER_TYPE_HOST_INFO;
 				channel = KNET_INTERNAL_DATA_CHANNEL;
 			} else {
 				type = KNET_HEADER_TYPE_DATA;
 				for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
 					if ((knet_h->sockfd[channel].in_use) &&
 					    (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
 						break;
 					}
 				}
 				if (channel >= KNET_DATAFD_MAX) {
 					log_debug(knet_h, KNET_SUB_TX, "No available channels");
 					continue; /* channel not found */
 				}
 			}
 			if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
 				log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
 				continue;
 			}
 			_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type);
 			pthread_mutex_unlock(&knet_h->tx_mutex);
 		}
 
 		pthread_rwlock_unlock(&knet_h->global_rwlock);
 	}
 
 	set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED);
 
 	return NULL;
 }
 
 int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
 {
 	int savederrno = 0, err = 0;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff == NULL) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff_len <= 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff_len > KNET_MAX_PACKET_SIZE) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel < 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel >= KNET_DATAFD_MAX) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	if (!knet_h->sockfd[channel].in_use) {
 		savederrno = EINVAL;
 		err = -1;
 		goto out;
 	}
 
 	savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
 			strerror(savederrno));
 		err = -1;
 		goto out;
 	}
 
 	knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA;
 	memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len);
 	err = _parse_recv_from_sock(knet_h, buff_len, channel, 1);
 	savederrno = errno;
 
 	pthread_mutex_unlock(&knet_h->tx_mutex);
 
 out:
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 
 	errno = err ? savederrno : 0;
 	return err;
 }
 
 ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
 {
 	int savederrno = 0;
 	ssize_t err = 0;
 	struct iovec iov_out[1];
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff == NULL) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff_len <= 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff_len > KNET_MAX_PACKET_SIZE) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel < 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel >= KNET_DATAFD_MAX) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	if (!knet_h->sockfd[channel].in_use) {
 		savederrno = EINVAL;
 		err = -1;
 		goto out_unlock;
 	}
 
 	memset(iov_out, 0, sizeof(iov_out));
 
 	iov_out[0].iov_base = (void *)buff;
 	iov_out[0].iov_len = buff_len;
 
 	err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
 	savederrno = errno;
 
 out_unlock:
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 	errno = err ? savederrno : 0;
 	return err;
 }
diff --git a/libknet/transport_sctp.c b/libknet/transport_sctp.c
index 17e2f542..705b6ca7 100644
--- a/libknet/transport_sctp.c
+++ b/libknet/transport_sctp.c
@@ -1,1602 +1,1606 @@
 /*
  * Copyright (C) 2016-2020 Red Hat, Inc.  All rights reserved.
  *
  * Author: Christine Caulfield <ccaulfie@redhat.com>
  *
  * This software licensed under LGPL-2.0+
  */
 
 #include "config.h"
 
 #include <string.h>
 #include <unistd.h>
 #include <errno.h>
 #include <pthread.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <stdlib.h>
 #include <assert.h>
 
 #include "compat.h"
 #include "host.h"
 #include "links.h"
 #include "links_acl.h"
 #include "links_acl_ip.h"
 #include "logging.h"
 #include "netutils.h"
 #include "common.h"
 #include "transport_common.h"
 #include "transports.h"
 #include "threads_common.h"
 
 #ifdef HAVE_NETINET_SCTP_H
 #include <netinet/sctp.h>
 #include "transport_sctp.h"
 
 typedef struct sctp_handle_info {
 	struct qb_list_head listen_links_list;
 	struct qb_list_head connect_links_list;
 	int connect_epollfd;
 	int connectsockfd[2];
 	int listen_epollfd;
 	int listensockfd[2];
 	pthread_t connect_thread;
 	pthread_t listen_thread;
 	socklen_t event_subscribe_kernel_size;
 	char *event_subscribe_buffer;
 } sctp_handle_info_t;
 
 /*
  * use by fd_tracker data type
  */
 #define SCTP_NO_LINK_INFO       0
 #define SCTP_LISTENER_LINK_INFO 1
 #define SCTP_ACCEPTED_LINK_INFO 2
 #define SCTP_CONNECT_LINK_INFO  3
 
 /*
  * this value is per listener
  */
 #define MAX_ACCEPTED_SOCKS 256
 
 typedef struct sctp_listen_link_info {
 	struct qb_list_head list;
 	int listen_sock;
 	int accepted_socks[MAX_ACCEPTED_SOCKS];
 	struct sockaddr_storage src_address;
 	int on_listener_epoll;
 	int on_rx_epoll;
 	int sock_shutdown;
 } sctp_listen_link_info_t;
 
 typedef struct sctp_accepted_link_info {
 	char mread_buf[KNET_DATABUFSIZE];
 	ssize_t mread_len;
 	sctp_listen_link_info_t *link_info;
 } sctp_accepted_link_info_t ;
 
 typedef struct sctp_connect_link_info {
 	struct qb_list_head list;
 	sctp_listen_link_info_t *listener;
 	struct knet_link *link;
 	struct sockaddr_storage dst_address;
 	int connect_sock;
 	int on_rx_epoll;
 	int close_sock;
 	int sock_shutdown;
 } sctp_connect_link_info_t;
 
 /*
  * socket handling functions
  *
  * those functions do NOT perform locking. locking
  * should be handled in the right context from callers
  */
 
 /*
  * sockets are removed from rx_epoll from callers
  * see also error handling functions
  */
 static int _close_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
 {
 	int err = 0, savederrno = 0;
 	struct epoll_event ev;
 	sctp_connect_link_info_t *info = kn_link->transport_link;
 
 	if (info->connect_sock != -1) {
 		if (info->on_rx_epoll) {
 			memset(&ev, 0, sizeof(struct epoll_event));
 			ev.events = EPOLLIN;
 			ev.data.fd = info->connect_sock;
 			if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) {
 				savederrno = errno;
 				err = -1;
 				log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from epoll pool: %s",
 				strerror(savederrno));
 				goto exit_error;
 			}
 			info->on_rx_epoll = 0;
 		}
 
 		if (_set_fd_tracker(knet_h, info->connect_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
 			savederrno = errno;
 			err = -1;
 			log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
 				strerror(savederrno));
 		} else {
 			close(info->connect_sock);
 			info->connect_sock = -1;
 		}
 	}
 
 exit_error:
 	errno = savederrno;
 	return err;
 }
 
 static int _enable_sctp_notifications(knet_handle_t knet_h, int sock, const char *type)
 {
 	int err = 0, savederrno = 0;
 	sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
 
 	if (setsockopt(sock, IPPROTO_SCTP, SCTP_EVENTS,
 		       handle_info->event_subscribe_buffer,
 		       handle_info->event_subscribe_kernel_size) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to enable %s events: %s",
 			type, strerror(savederrno));
 	}
 
 	errno = savederrno;
 	return err;
 }
 
 static int _configure_sctp_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, uint64_t flags, const char *type)
 {
 	int err = 0, savederrno = 0;
 	int value;
 	int level;
 
 #ifdef SOL_SCTP
 	level = SOL_SCTP;
 #else
 	level = IPPROTO_SCTP;
 #endif
 
 	if (_configure_transport_socket(knet_h, sock, address, flags, type) < 0) {
 		savederrno = errno;
 		err = -1;
 		goto exit_error;
 	}
 
 	value = 1;
 	if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set reuseaddr on socket %d: %s",
 			sock, strerror(savederrno));
 		goto exit_error;
 	}
 
 	value = 1;
 	if (setsockopt(sock, level, SCTP_NODELAY, &value, sizeof(value)) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set sctp nodelay: %s",
 			strerror(savederrno));
 		goto exit_error;
 	}
 
 	if (_enable_sctp_notifications(knet_h, sock, type) < 0) {
 		savederrno = errno;
 		err = -1;
 	}
 
 exit_error:
 	errno = savederrno;
 	return err;
 }
 
 static int _reconnect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
 {
 	int err = 0, savederrno = 0;
 	sctp_connect_link_info_t *info = kn_link->transport_link;
 
 	if (connect(info->connect_sock, (struct sockaddr *)&kn_link->dst_addr, sockaddr_len(&kn_link->dst_addr)) < 0) {
 		savederrno = errno;
 		log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP socket %d received error: %s", info->connect_sock, strerror(savederrno));
 		if ((savederrno != EALREADY) && (savederrno != EINPROGRESS) && (savederrno != EISCONN)) {
 			err = -1;
 			log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to connect SCTP socket %d: %s",
 				info->connect_sock, strerror(savederrno));
 		}
 	}
 
 	errno = savederrno;
 	return err;
 }
 
 static int _create_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
 {
 	int err = 0, savederrno = 0;
 	struct epoll_event ev;
 	sctp_connect_link_info_t *info = kn_link->transport_link;
 	int connect_sock;
 	struct sockaddr_storage connect_addr;
 
 	connect_sock = socket(kn_link->dst_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP);
 	if (connect_sock < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create send/recv socket: %s",
 			strerror(savederrno));
 		goto exit_error;
 	}
 
 	if (_configure_sctp_socket(knet_h, connect_sock, &kn_link->dst_addr, kn_link->flags, "SCTP connect") < 0) {
 		savederrno = errno;
 		err = -1;
 		goto exit_error;
 	}
 
 	memset(&connect_addr, 0, sizeof(struct sockaddr_storage));
 	if (knet_strtoaddr(kn_link->status.src_ipaddr, "0", &connect_addr, sockaddr_len(&connect_addr)) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to resolve connecting socket: %s",
 			strerror(savederrno));
 		goto exit_error;
 
 	}
 
 	if (bind(connect_sock, (struct sockaddr *)&connect_addr, sockaddr_len(&connect_addr)) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind connecting socket: %s",
 			strerror(savederrno));
 		goto exit_error;
 	}
 
 	if (_set_fd_tracker(knet_h, connect_sock, KNET_TRANSPORT_SCTP, SCTP_CONNECT_LINK_INFO, info) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
 			strerror(savederrno));
 		goto exit_error;
 	}
 
 	memset(&ev, 0, sizeof(struct epoll_event));
 	ev.events = EPOLLIN;
 	ev.data.fd = connect_sock;
 	if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, connect_sock, &ev)) {
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connected socket to epoll pool: %s",
 			strerror(errno));
 	}
 	info->on_rx_epoll = 1;
 
 	info->connect_sock = connect_sock;
 	info->close_sock = 0;
 	kn_link->outsock = info->connect_sock;
 
 	if (_reconnect_socket(knet_h, kn_link) < 0) {
 		savederrno = errno;
 		err = -1;
 		goto exit_error;
 	}
 
 exit_error:
 	if (err) {
 		if (connect_sock >= 0) {
 			close(connect_sock);
 		}
 	}
 	errno = savederrno;
 	return err;
 }
 
 static void _lock_sleep_relock(knet_handle_t knet_h)
 {
 	int i = 0;
 
 	/* Don't hold onto the lock while sleeping */
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 
 	while (i < 5) {
 		usleep(KNET_THREADS_TIMERES / 16);
 		if (!pthread_rwlock_rdlock(&knet_h->global_rwlock)) {
 			/*
 			 * lock acquired, we can go out
 			 */
 			return;
 		} else {
 			log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get read lock!");
 			i++;
 		}
 	}
 	/*
 	 * time to crash! if we cannot re-acquire the lock
 	 * there is no easy way out of this one
 	 */
 	assert(0);
 }
 
 int sctp_transport_tx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
 {
 	sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data;
 	sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data;
 	sctp_listen_link_info_t *listen_info;
 
 	if (recv_err < 0) {
 		switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) {
 			case SCTP_CONNECT_LINK_INFO:
 				if (connect_info->link->transport_connected == 0) {
 					return -1;
 				}
 				break;
 			case SCTP_ACCEPTED_LINK_INFO:
 				listen_info = accepted_info->link_info;
 				if (listen_info->listen_sock != sockfd) {
 					if (listen_info->on_rx_epoll == 0) {
 						return -1;
 					}
 				}
 				break;
 		}
 		if (recv_errno == EAGAIN) {
 #ifdef DEBUG
 			log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Sock: %d is overloaded. Slowing TX down", sockfd);
 #endif
 			_lock_sleep_relock(knet_h);
 			return 1;
 		}
 		return -1;
 	}
 	return 0;
 }
 
 /*
  * socket error management functions
  *
  * both called with global read lock.
  *
  * NOTE: we need to remove the fd from the epoll as soon as possible
  *       even before we notify the respective thread to take care of it
  *       because scheduling can make it so that this thread will overload
  *       and the threads supposed to take care of the error will never
  *       be able to take action.
  *       we CANNOT handle FDs here directly (close/reconnect/etc) due
  *       to locking context. We need to delegate that to their respective
  *       management threads within the global write lock.
  *
  * this function is called from:
  * - RX thread with recv_err <= 0 directly on recvmmsg error
  * - transport_rx_is_data when msg_len == 0 (recv_err = 1)
  * - transport_rx_is_data on notification (recv_err = 2)
  *
  * basically this small abuse of recv_err is to detect notifications
  * generated by sockets created by listen().
  */
 int sctp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
 {
 	struct epoll_event ev;
 	sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data;
 	sctp_listen_link_info_t *listen_info;
 	sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
 
 	switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) {
 		case SCTP_CONNECT_LINK_INFO:
 			/*
 			 * all connect link have notifications enabled
 			 * and we accept only data from notification and
 			 * generic recvmmsg errors.
 			 *
 			 * Errors generated by msg_len 0 can be ignored because
 			 * they follow a notification (double notification)
 			 */
 			if (recv_err != 1) {
 				log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received an error", sockfd);
 				if (sendto(handle_info->connectsockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
 					log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno));
 				}
 			}
 			break;
 		case SCTP_ACCEPTED_LINK_INFO:
 			listen_info = accepted_info->link_info;
 			if (listen_info->listen_sock != sockfd) {
 				if (recv_err != 1) {
 					if (listen_info->on_rx_epoll) {
 						memset(&ev, 0, sizeof(struct epoll_event));
 						ev.events = EPOLLIN;
 						ev.data.fd = sockfd;
 						if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) {
 							log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
 							strerror(errno));
 							return -1;
 						}
 						listen_info->on_rx_epoll = 0;
 					}
 					log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying listen thread that sockfd %d received an error", sockfd);
 					if (sendto(handle_info->listensockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
 						log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify listen thread: %s", strerror(errno));
 					}
 				}
 			} else {
 				/*
 				 * this means the listen() socket has generated
 				 * a notification. now what? :-)
 				 */
 				log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen() socket %d", sockfd);
 			}
 			break;
 		default:
 			log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received unknown notification? %d", sockfd);
 			break;
 	}
 	/*
 	 * Under RX pressure we need to give time to IPC to pick up the message
 	 */
 
 	_lock_sleep_relock(knet_h);
 	return 0;
 }
 
 /*
  * NOTE: sctp_transport_rx_is_data is called with global rdlock
  *       delegate any FD error management to sctp_transport_rx_sock_error
  *       and keep this code to parsing incoming data only
  */
 int sctp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
 {
 	size_t i;
 	struct iovec *iov = msg->msg_hdr.msg_iov;
 	size_t iovlen = msg->msg_hdr.msg_iovlen;
 	struct sctp_assoc_change *sac;
 	union sctp_notification  *snp;
 	sctp_accepted_link_info_t *listen_info = knet_h->knet_transport_fd_tracker[sockfd].data;
 	sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data;
 
 	if (!(msg->msg_hdr.msg_flags & MSG_NOTIFICATION)) {
 		if (msg->msg_len == 0) {
 			/*
 			 * NOTE: with event notification enabled, we receive error twice:
 			 *       1) from the event notification
 			 *       2) followed by a 0 byte msg_len
 			 *
 			 * the event handler should take care to avoid #2 by stopping
 			 * the rx thread from processing more packets than necessary.
 			 */
 			if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
 				if (connect_info->sock_shutdown) {
 					return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE;
 				}
 			} else {
 				if (listen_info->link_info->sock_shutdown) {
 					return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE;
 				}
 			}
 			/*
 			 * this is pretty much dead code and we should never hit it.
 			 * keep it for safety and avoid the rx thread to process
 			 * bad info / data.
 			 */
 			return KNET_TRANSPORT_RX_NOT_DATA_STOP;
 		}
 		/*
 		 * missing MSG_EOR has to be treated as a short read
 		 * from the socket and we need to fill in the mread buf
 		 * while we wait for MSG_EOR
 		 */
 		if (!(msg->msg_hdr.msg_flags & MSG_EOR)) {
 			/*
 			 * copy the incoming data into mread_buf + mread_len (incremental)
 			 * and increase mread_len
 			 */
 			memmove(listen_info->mread_buf + listen_info->mread_len, iov->iov_base, msg->msg_len);
 			listen_info->mread_len = listen_info->mread_len + msg->msg_len;
 			return KNET_TRANSPORT_RX_NOT_DATA_CONTINUE;
 		}
 		/*
 		 * got EOR.
 		 * if mread_len is > 0 we are completing a packet from short reads
 		 * complete reassembling the packet in mread_buf, copy it back in the iov
 		 * and set the iov/msg len numbers (size) correctly
 		 */
 		if (listen_info->mread_len) {
 			/*
 			 * add last fragment to mread_buf
 			 */
 			memmove(listen_info->mread_buf + listen_info->mread_len, iov->iov_base, msg->msg_len);
 			listen_info->mread_len = listen_info->mread_len + msg->msg_len;
 			/*
 			 * move all back into the iovec
 			 */
 			memmove(iov->iov_base, listen_info->mread_buf, listen_info->mread_len);
 			msg->msg_len = listen_info->mread_len;
 			listen_info->mread_len = 0;
 		}
 		return KNET_TRANSPORT_RX_IS_DATA;
 	}
 
 	if (!(msg->msg_hdr.msg_flags & MSG_EOR)) {
 		return KNET_TRANSPORT_RX_NOT_DATA_STOP;
 	}
 
 	for (i = 0; i < iovlen; i++) {
 		snp = iov[i].iov_base;
 
 		switch (snp->sn_header.sn_type) {
 			case SCTP_ASSOC_CHANGE:
 				sac = &snp->sn_assoc_change;
 				switch (sac->sac_state) {
 					case SCTP_COMM_LOST:
 						log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: comm_lost", sockfd);
 						if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
 							connect_info->close_sock = 1;
 							connect_info->link->transport_connected = 0;
 						}
 						sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
 						return KNET_TRANSPORT_RX_OOB_DATA_STOP;
 						break;
 					case SCTP_COMM_UP:
 						log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: comm_up", sockfd);
 						if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
 							connect_info->link->transport_connected = 1;
 						}
 						break;
 					case SCTP_RESTART:
 						log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: restart", sockfd);
 						break;
 					case SCTP_SHUTDOWN_COMP:
 						log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: shutdown comp", sockfd);
 						if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
 							connect_info->close_sock = 1;
 						}
 						sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
 						return KNET_TRANSPORT_RX_OOB_DATA_STOP;
 						break;
 					case SCTP_CANT_STR_ASSOC:
 						log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: cant str assoc", sockfd);
 						sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
 						break;
 					default:
 						log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: unknown %d", sockfd, sac->sac_state);
 						break;
 				}
 				break;
 			case SCTP_SHUTDOWN_EVENT:
 				log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp shutdown event socket %d", sockfd);
 				if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
 					connect_info->link->transport_connected = 0;
 					connect_info->sock_shutdown = 1;
 				} else {
 					listen_info->link_info->sock_shutdown = 1;
 				}
 				break;
 			case SCTP_SEND_FAILED:
 				log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp send failed socket: %d", sockfd);
 				break;
 			case SCTP_PEER_ADDR_CHANGE:
 				log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp peer addr change socket %d", sockfd);
 				break;
 			case SCTP_REMOTE_ERROR:
 				log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp remote error socket %d", sockfd);
 				break;
 			default:
 				log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] unknown sctp event socket: %d type: %hu", sockfd, snp->sn_header.sn_type);
 				break;
 		}
 	}
 	return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE;
 }
 
 int sctp_transport_link_is_down(knet_handle_t knet_h, struct knet_link *kn_link)
 {
 	sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
 	sctp_connect_link_info_t *info = kn_link->transport_link;
 
 	kn_link->transport_connected = 0;
 	info->close_sock = 1;
 
 	log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received a link down event", info->connect_sock);
 	if (sendto(handle_info->connectsockfd[1], &info->connect_sock, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
 		log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno));
 	}
 
 	return 0;
 }
 
 /*
  * connect / outgoing socket management thread
  */
 
 /*
  * _handle_connected_sctp* are called with a global write lock
  * from the connect_thread
  */
 static void _handle_connected_sctp_socket(knet_handle_t knet_h, int connect_sock)
 {
 	int err;
 	unsigned int status, len = sizeof(status);
 	sctp_connect_link_info_t *info = knet_h->knet_transport_fd_tracker[connect_sock].data;
 	struct knet_link *kn_link = info->link;
 
 	if (info->close_sock) {
 		if (_close_connect_socket(knet_h, kn_link) < 0) {
 			log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close sock %d from _handle_connected_sctp_socket: %s", connect_sock, strerror(errno));
 			return;
 		}
 		info->close_sock = 0;
 		if (_create_connect_socket(knet_h, kn_link) < 0) {
 			log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to recreate connecting sock! %s", strerror(errno));
 			return;
 		}
 	}
 
 	_reconnect_socket(knet_h, info->link);
 
 	err = getsockopt(connect_sock, SOL_SOCKET, SO_ERROR, &status, &len);
 	if (err) {
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP getsockopt() on connecting socket %d failed: %s",
 			connect_sock, strerror(errno));
 		return;
 	}
 
 	if (status) {
 		log_info(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect on %d to %s port %s failed: %s",
 			 connect_sock, kn_link->status.dst_ipaddr, kn_link->status.dst_port,
 			 strerror(status));
 
 		/*
 		 * No need to create a new socket if connect failed,
 		 * just retry connect
 		 */
 		return;
 	}
 
 	log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP handler fd %d now connected to %s port %s",
 		  connect_sock,
 		  kn_link->status.dst_ipaddr, kn_link->status.dst_port);
 }
 
 static void _handle_connected_sctp_notifications(knet_handle_t knet_h)
 {
 	int sockfd = -1;
 	sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
 
 	if (recv(handle_info->connectsockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) {
 		log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on connectsockfd");
 		return;
 	}
 
 	if (_is_valid_fd(knet_h, sockfd) < 1) {
 		log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for connected socket fd error");
 		return;
 	}
 
 	/*
 	 * revalidate sockfd
 	 */
 	if ((sockfd < 0) || (sockfd >= KNET_MAX_FDS)) {
 		return;
 	}
 
 	log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing connected error on socket: %d", sockfd);
 
 	_handle_connected_sctp_socket(knet_h, sockfd);
 }
 
 static void *_sctp_connect_thread(void *data)
 {
 	int savederrno;
 	int i, nev;
 	knet_handle_t knet_h = (knet_handle_t) data;
 	sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
 	struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
 
 	set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_STARTED);
 
+	memset(&events, 0, sizeof(events));
+
 	while (!shutdown_in_progress(knet_h)) {
 		nev = epoll_wait(handle_info->connect_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
 
 		/*
 		 * we use timeout to detect if thread is shutting down
 		 */
 		if (nev == 0) {
 			continue;
 		}
 
 		if (nev < 0) {
 			log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect handler EPOLL ERROR: %s",
 				  strerror(errno));
 			continue;
 		}
 
 		/*
 		 * Sort out which FD has a connection
 		 */
 		savederrno = get_global_wrlock(knet_h);
 		if (savederrno) {
 			log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s",
 				strerror(savederrno));
 			continue;
 		}
 
 		/*
 		 * minor optimization: deduplicate events
 		 *
 		 * in some cases we can receive multiple notifcations
 		 * of the same FD having issues or need handling.
 		 * It's enough to process it once even tho it's safe
 		 * to handle them multiple times.
 		 */
 		for (i = 0; i < nev; i++) {
 			if (events[i].data.fd == handle_info->connectsockfd[0]) {
 				log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for connected socket");
 				_handle_connected_sctp_notifications(knet_h);
 			} else {
 				log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification on connected sockfd %d\n", events[i].data.fd);
 			}
 		}
 		pthread_rwlock_unlock(&knet_h->global_rwlock);
 		/*
 		 * this thread can generate events for itself.
 		 * we need to sleep in between loops to allow other threads
 		 * to be scheduled
 		 */
 		usleep(knet_h->reconnect_int * 1000);
 	}
 
 	set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_STOPPED);
 
 	return NULL;
 }
 
 /*
  * listen/incoming connections management thread
  */
 
 /*
  * Listener received a new connection
  * called with a write lock from main thread
  */
 static void _handle_incoming_sctp(knet_handle_t knet_h, int listen_sock)
 {
 	int err = 0, savederrno = 0;
 	int new_fd;
 	int i = -1;
 	sctp_listen_link_info_t *info = knet_h->knet_transport_fd_tracker[listen_sock].data;
 	struct epoll_event ev;
 	struct sockaddr_storage ss;
 	socklen_t sock_len = sizeof(ss);
 	char addr_str[KNET_MAX_HOST_LEN];
 	char port_str[KNET_MAX_PORT_LEN];
 	sctp_accepted_link_info_t *accept_info = NULL;
 
 	new_fd = accept(listen_sock, (struct sockaddr *)&ss, &sock_len);
 	if (new_fd < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accept error: %s", strerror(errno));
 		goto exit_error;
 	}
 
 	if (knet_addrtostr(&ss, sizeof(ss),
 			   addr_str, KNET_MAX_HOST_LEN,
 			   port_str, KNET_MAX_PORT_LEN) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to gather socket info");
 		goto exit_error;
 	}
 
 	log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: received connection from: %s port: %s",
 						addr_str, port_str);
 	if (knet_h->use_access_lists) {
 		if (!check_validate(knet_h, listen_sock, KNET_TRANSPORT_SCTP, &ss)) {
 			savederrno = EINVAL;
 			log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Connection rejected from %s/%s", addr_str, port_str);
 			close(new_fd);
 			errno = savederrno;
 			return;
 		}
 	}
 
 	/*
 	 * Keep a track of all accepted FDs
 	 */
 	for (i=0; i<MAX_ACCEPTED_SOCKS; i++) {
 		if (info->accepted_socks[i] == -1) {
 			info->accepted_socks[i] = new_fd;
 			break;
 		}
 	}
 
 	if (i == MAX_ACCEPTED_SOCKS) {
 		errno = EBUSY;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: too many connections!");
 		goto exit_error;
 	}
 
 	if (_configure_common_socket(knet_h, new_fd, 0, "SCTP incoming") < 0) { /* Inherit flags from listener? */
 		savederrno = errno;
 		err = -1;
 		goto exit_error;
 	}
 
 	if (_enable_sctp_notifications(knet_h, new_fd, "Incoming connection") < 0) {
 		savederrno = errno;
 		err = -1;
 		goto exit_error;
 	}
 
 	accept_info = malloc(sizeof(sctp_accepted_link_info_t));
 	if (!accept_info) {
 		savederrno = errno;
 		err = -1;
 		goto exit_error;
 	}
 	memset(accept_info, 0, sizeof(sctp_accepted_link_info_t));
 
 	accept_info->link_info = info;
 
 	if (_set_fd_tracker(knet_h, new_fd, KNET_TRANSPORT_SCTP, SCTP_ACCEPTED_LINK_INFO, accept_info) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
 			strerror(errno));
 		goto exit_error;
 	}
 
 	memset(&ev, 0, sizeof(struct epoll_event));
 	ev.events = EPOLLIN;
 	ev.data.fd = new_fd;
 	if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, new_fd, &ev)) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to add accepted socket %d to epoll pool: %s",
 			new_fd, strerror(errno));
 		goto exit_error;
 	}
 	info->on_rx_epoll = 1;
 
 	log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accepted new fd %d for %s/%s (listen fd: %d). index: %d",
 		  new_fd, addr_str, port_str, info->listen_sock, i);
 
 exit_error:
 	if (err) {
 		if ((i >= 0) && (i < MAX_ACCEPTED_SOCKS)) {
 			info->accepted_socks[i] = -1;
 		}
 		_set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL);
 		free(accept_info);
 		if (new_fd >= 0) {
 			close(new_fd);
 		}
 	}
 	errno = savederrno;
 	return;
 }
 
 /*
  * Listen thread received a notification of a bad socket that needs closing
  * called with a write lock from main thread
  */
 static void _handle_listen_sctp_errors(knet_handle_t knet_h)
 {
 	int sockfd = -1;
 	sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
 	sctp_accepted_link_info_t *accept_info;
 	sctp_listen_link_info_t *info;
 	struct knet_host *host;
 	int link_idx;
 	int i;
 
 	if (recv(handle_info->listensockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) {
 		log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on listensockfd");
 		return;
 	}
 
 	if (_is_valid_fd(knet_h, sockfd) < 1) {
 		log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen socket fd error");
 		return;
 	}
 
 	/*
 	 * revalidate sockfd
 	 */
 	if ((sockfd < 0) || (sockfd >= KNET_MAX_FDS)) {
 		return;
 	}
 
 	log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing listen error on socket: %d", sockfd);
 
 	accept_info = knet_h->knet_transport_fd_tracker[sockfd].data;
 	info = accept_info->link_info;
 
 	/*
 	 * clear all links using this accepted socket as
 	 * outbound dynamically connected socket
 	 */
 
 	for (host = knet_h->host_head; host != NULL; host = host->next) {
 		for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
 			if ((host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
 			    (host->link[link_idx].outsock == sockfd)) {
 				log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Found dynamic connection on host %d link %d (%d)",
 					  host->host_id, link_idx, sockfd);
 				host->link[link_idx].status.dynconnected = 0;
 				host->link[link_idx].transport_connected = 0;
 				host->link[link_idx].outsock = 0;
 				memset(&host->link[link_idx].dst_addr, 0, sizeof(struct sockaddr_storage));
 			}
 		}
 	}
 
 	for (i=0; i<MAX_ACCEPTED_SOCKS; i++) {
 		if (sockfd == info->accepted_socks[i]) {
 			log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Closing accepted socket %d", sockfd);
 			_set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL);
 			info->accepted_socks[i] = -1;
 			free(accept_info);
 			close(sockfd);
 			break; /* Keeps covscan happy */
 		}
 	}
 }
 
 static void *_sctp_listen_thread(void *data)
 {
 	int savederrno;
 	int i, nev;
 	knet_handle_t knet_h = (knet_handle_t) data;
 	sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
 	struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
 
 	set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_STARTED);
 
+	memset(&events, 0, sizeof(events));
+
 	while (!shutdown_in_progress(knet_h)) {
 		nev = epoll_wait(handle_info->listen_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
 
 		/*
 		 * we use timeout to detect if thread is shutting down
 		 */
 		if (nev == 0) {
 			continue;
 		}
 
 		if (nev < 0) {
 			log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listen handler EPOLL ERROR: %s",
 				  strerror(errno));
 			continue;
 		}
 
 		savederrno = get_global_wrlock(knet_h);
 		if (savederrno) {
 			log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s",
 				strerror(savederrno));
 			continue;
 		}
 		/*
 		 * Sort out which FD has an incoming connection
 		 */
 		for (i = 0; i < nev; i++) {
 			if (events[i].data.fd == handle_info->listensockfd[0]) {
 				log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for listener/accepted socket");
 				_handle_listen_sctp_errors(knet_h);
 			} else {
 				if (_is_valid_fd(knet_h, events[i].data.fd) == 1) {
 					_handle_incoming_sctp(knet_h, events[i].data.fd);
 				} else {
 					log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received listen notification from invalid socket");
 				}
 			}
 
 		}
 		pthread_rwlock_unlock(&knet_h->global_rwlock);
 	}
 
 	set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_STOPPED);
 
 	return NULL;
 }
 
 /*
  * sctp_link_listener_start/stop are called in global write lock
  * context from set_config and clear_config.
  */
 static sctp_listen_link_info_t *sctp_link_listener_start(knet_handle_t knet_h, struct knet_link *kn_link)
 {
 	int err = 0, savederrno = 0;
 	int listen_sock = -1;
 	struct epoll_event ev;
 	sctp_listen_link_info_t *info = NULL;
 	sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
 
 	/*
 	 * Only allocate a new listener if src address is different
 	 */
 	qb_list_for_each_entry(info, &handle_info->listen_links_list, list) {
 		if (memcmp(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)) == 0) {
 			if ((check_add(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP, -1,
 				       &kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) {
 				return NULL;
 			}
 			return info;
 		}
 	}
 
 	info = malloc(sizeof(sctp_listen_link_info_t));
 	if (!info) {
 		err = -1;
 		goto exit_error;
 	}
 
 	memset(info, 0, sizeof(sctp_listen_link_info_t));
 
 	memset(info->accepted_socks, -1, sizeof(info->accepted_socks));
 	memmove(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage));
 
 	listen_sock = socket(kn_link->src_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP);
 	if (listen_sock < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create listener socket: %s",
 			strerror(savederrno));
 		goto exit_error;
 	}
 
 	if (_configure_sctp_socket(knet_h, listen_sock, &kn_link->src_addr, kn_link->flags, "SCTP listener") < 0) {
 		savederrno = errno;
 		err = -1;
 		goto exit_error;
 	}
 
 	if (bind(listen_sock, (struct sockaddr *)&kn_link->src_addr, sockaddr_len(&kn_link->src_addr)) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind listener socket: %s",
 			strerror(savederrno));
 		goto exit_error;
 	}
 
 	if (listen(listen_sock, 5) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to listen on listener socket: %s",
 			strerror(savederrno));
 		goto exit_error;
 	}
 
 	if (_set_fd_tracker(knet_h, listen_sock, KNET_TRANSPORT_SCTP, SCTP_LISTENER_LINK_INFO, info) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
 			strerror(savederrno));
 		goto exit_error;
 	}
 
 	if ((check_add(knet_h, listen_sock, KNET_TRANSPORT_SCTP, -1,
 		       &kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to configure default access lists: %s",
 			strerror(savederrno));
 		goto exit_error;
 	}
 
 	memset(&ev, 0, sizeof(struct epoll_event));
 	ev.events = EPOLLIN;
 	ev.data.fd = listen_sock;
 	if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, listen_sock, &ev)) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listener to epoll pool: %s",
 			strerror(savederrno));
 		goto exit_error;
 	}
 	info->on_listener_epoll = 1;
 
 	info->listen_sock = listen_sock;
 	qb_list_add(&info->list, &handle_info->listen_links_list);
 
 	log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Listening on fd %d for %s:%s", listen_sock, kn_link->status.src_ipaddr, kn_link->status.src_port);
 
 exit_error:
 	if (err) {
 		if ((info) && (info->on_listener_epoll)) {
 			epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, listen_sock, &ev);
 		}
 		if (listen_sock >= 0) {
 			check_rmall(knet_h, listen_sock, KNET_TRANSPORT_SCTP);
 			close(listen_sock);
 		}
 		if (info) {
 			free(info);
 			info = NULL;
 		}
 	}
 	errno = savederrno;
 	return info;
 }
 
 static int sctp_link_listener_stop(knet_handle_t knet_h, struct knet_link *kn_link)
 {
 	int err = 0, savederrno = 0;
 	int found = 0, i;
 	struct knet_host *host;
 	int link_idx;
 	sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
 	sctp_connect_link_info_t *this_link_info = kn_link->transport_link;
 	sctp_listen_link_info_t *info = this_link_info->listener;
 	sctp_connect_link_info_t *link_info;
 	struct epoll_event ev;
 
 	for (host = knet_h->host_head; host != NULL; host = host->next) {
 		for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
 			if (&host->link[link_idx] == kn_link)
 				continue;
 
 			link_info = host->link[link_idx].transport_link;
 			if ((link_info) &&
 			    (link_info->listener == info)) {
 				found = 1;
 				break;
 			}
 		}
 	}
 
 	if ((check_rm(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP,
 		      &kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != ENOENT)) {
 		log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove default access lists for %d", info->listen_sock);
 	}
 
 	if (found) {
 		this_link_info->listener = NULL;
 		log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listener socket %d still in use", info->listen_sock);
 		savederrno = EBUSY;
 		err = -1;
 		goto exit_error;
 	}
 
 	if (info->on_listener_epoll) {
 		memset(&ev, 0, sizeof(struct epoll_event));
 		ev.events = EPOLLIN;
 		ev.data.fd = info->listen_sock;
 		if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, info->listen_sock, &ev)) {
 			savederrno = errno;
 			err = -1;
 			log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener to epoll pool: %s",
 				strerror(savederrno));
 			goto exit_error;
 		}
 		info->on_listener_epoll = 0;
 	}
 
 	if (_set_fd_tracker(knet_h, info->listen_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
 			strerror(savederrno));
 		goto exit_error;
 	}
 
 	check_rmall(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP);
 
 	close(info->listen_sock);
 
 	for (i=0; i< MAX_ACCEPTED_SOCKS; i++) {
 		if (info->accepted_socks[i] > -1) {
 			memset(&ev, 0, sizeof(struct epoll_event));
 			ev.events = EPOLLIN;
 			ev.data.fd = info->accepted_socks[i];
 			if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->accepted_socks[i], &ev)) {
 				log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
 					strerror(errno));
 			}
 			info->on_rx_epoll = 0;
 			free(knet_h->knet_transport_fd_tracker[info->accepted_socks[i]].data);
 			close(info->accepted_socks[i]);
 			if (_set_fd_tracker(knet_h, info->accepted_socks[i], KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) {
 				savederrno = errno;
 				err = -1;
 				log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
 					strerror(savederrno));
 				goto exit_error;
 			}
 			info->accepted_socks[i] = -1;
 		}
 	}
 
 	qb_list_del(&info->list);
 	free(info);
 	this_link_info->listener = NULL;
 
 exit_error:
 	errno = savederrno;
 	return err;
 }
 
 /*
  * Links config/clear. Both called with global wrlock from link_set_config/clear_config
  */
 int sctp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link)
 {
 	int savederrno = 0, err = 0;
 	sctp_connect_link_info_t *info;
 	sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
 
 	info = malloc(sizeof(sctp_connect_link_info_t));
 	if (!info) {
 		goto exit_error;
 	}
 
 	memset(info, 0, sizeof(sctp_connect_link_info_t));
 
 	kn_link->transport_link = info;
 	info->link = kn_link;
 
 	memmove(&info->dst_address, &kn_link->dst_addr, sizeof(struct sockaddr_storage));
 	info->connect_sock = -1;
 
 	info->listener = sctp_link_listener_start(knet_h, kn_link);
 	if (!info->listener) {
 		savederrno = errno;
 		err = -1;
 		goto exit_error;
 	}
 
 	if (kn_link->dynamic == KNET_LINK_STATIC) {
 		if (_create_connect_socket(knet_h, kn_link) < 0) {
 			savederrno = errno;
 			err = -1;
 			goto exit_error;
 		}
 		kn_link->outsock = info->connect_sock;
 	}
 
 	qb_list_add(&info->list, &handle_info->connect_links_list);
 
 exit_error:
 	if (err) {
 		if (info) {
 			if (info->connect_sock >= 0) {
 				close(info->connect_sock);
 			}
 			if (info->listener) {
 				sctp_link_listener_stop(knet_h, kn_link);
 			}
 			kn_link->transport_link = NULL;
 			free(info);
 		}
 	}
 	errno = savederrno;
 	return err;
 }
 
 /*
  * called with global wrlock
  */
 int sctp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link)
 {
 	int err = 0, savederrno = 0;
 	sctp_connect_link_info_t *info;
 
 	if (!kn_link) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	info = kn_link->transport_link;
 
 	if (!info) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if ((sctp_link_listener_stop(knet_h, kn_link) <0) && (errno != EBUSY)) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener transport: %s",
 			strerror(savederrno));
 		goto exit_error;
 	}
 
 	if (_close_connect_socket(knet_h, kn_link) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close connected socket: %s",
 			strerror(savederrno));
 		goto exit_error;
 	}
 
 	qb_list_del(&info->list);
 
 	free(info);
 	kn_link->transport_link = NULL;
 
 exit_error:
 	errno = savederrno;
 	return err;
 }
 
 /*
  * transport_free and transport_init are
  * called only from knet_handle_new and knet_handle_free.
  * all resources (hosts/links) should have been already freed at this point
  * and they are called in a write locked context, hence they
  * don't need their own locking.
  */
 
 int sctp_transport_free(knet_handle_t knet_h)
 {
 	sctp_handle_info_t *handle_info;
 	void *thread_status;
 	struct epoll_event ev;
 
 	if (!knet_h->transports[KNET_TRANSPORT_SCTP]) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
 
 	/*
 	 * keep it here while we debug list usage and such
 	 */
 	if (!qb_list_empty(&handle_info->listen_links_list)) {
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. listen links list is not empty");
 	}
 	if (!qb_list_empty(&handle_info->connect_links_list)) {
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. connect links list is not empty");
 	}
 
 	if (handle_info->listen_thread) {
 		pthread_cancel(handle_info->listen_thread);
 		pthread_join(handle_info->listen_thread, &thread_status);
 	}
 
 	if (handle_info->connect_thread) {
 		pthread_cancel(handle_info->connect_thread);
 		pthread_join(handle_info->connect_thread, &thread_status);
 	}
 
 	if (handle_info->listensockfd[0] >= 0) {
 		memset(&ev, 0, sizeof(struct epoll_event));
 		ev.events = EPOLLIN;
 		ev.data.fd = handle_info->listensockfd[0];
 		epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, handle_info->listensockfd[0], &ev);
 	}
 
 	if (handle_info->connectsockfd[0] >= 0) {
 		memset(&ev, 0, sizeof(struct epoll_event));
 		ev.events = EPOLLIN;
 		ev.data.fd = handle_info->connectsockfd[0];
 		epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, handle_info->connectsockfd[0], &ev);
 	}
 
 	_close_socketpair(knet_h, handle_info->connectsockfd);
 	_close_socketpair(knet_h, handle_info->listensockfd);
 
 	if (handle_info->listen_epollfd >= 0) {
 		close(handle_info->listen_epollfd);
 	}
 
 	if (handle_info->connect_epollfd >= 0) {
 		close(handle_info->connect_epollfd);
 	}
 
 	free(handle_info->event_subscribe_buffer);
 	free(handle_info);
 	knet_h->transports[KNET_TRANSPORT_SCTP] = NULL;
 
 	return 0;
 }
 
 static int _sctp_subscribe_init(knet_handle_t knet_h)
 {
 	int test_socket, savederrno;
 	sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
 	char dummy_events[100];
 	struct sctp_event_subscribe *events;
 	/* Below we set the first 6 fields of this expanding struct.
 	 * SCTP_EVENTS is deprecated, but SCTP_EVENT is not available
 	 * on Linux; on the other hand, FreeBSD and old Linux does not
 	 * accept small transfers, so we can't simply use this minimum
 	 * everywhere.  Thus we query and store the native size. */
 	const unsigned int subscribe_min = 6;
 
 	test_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_SCTP);
 	if (test_socket < 0) {
 		if (errno == EPROTONOSUPPORT) {
 			log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP not supported, skipping initialization");
 			return 0;
 		}
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create test socket: %s",
 			strerror(savederrno));
 		return savederrno;
 	}
 	handle_info->event_subscribe_kernel_size = sizeof dummy_events;
 	if (getsockopt(test_socket, IPPROTO_SCTP, SCTP_EVENTS, &dummy_events,
 		       &handle_info->event_subscribe_kernel_size)) {
 		close(test_socket);
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to query kernel size of struct sctp_event_subscribe: %s",
 			strerror(savederrno));
 		return savederrno;
 	}
 	close(test_socket);
 	if (handle_info->event_subscribe_kernel_size < subscribe_min) {
 		savederrno = ERANGE;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP,
 			"No kernel support for the necessary notifications: struct sctp_event_subscribe is %u bytes, %u needed",
 			handle_info->event_subscribe_kernel_size, subscribe_min);
 		return savederrno;
 	}
 	events = malloc(handle_info->event_subscribe_kernel_size);
 	if (!events) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP,
 			"Failed to allocate event subscribe buffer: %s", strerror(savederrno));
 		return savederrno;
 	}
 	memset(events, 0, handle_info->event_subscribe_kernel_size);
 	events->sctp_data_io_event = 1;
 	events->sctp_association_event = 1;
 	events->sctp_address_event = 1;
 	events->sctp_send_failure_event = 1;
 	events->sctp_peer_error_event = 1;
 	events->sctp_shutdown_event = 1;
 	handle_info->event_subscribe_buffer = (char *)events;
 	log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Size of struct sctp_event_subscribe is %u in kernel, %zu in user space",
 		  handle_info->event_subscribe_kernel_size, sizeof(struct sctp_event_subscribe));
 	return 0;
 }
 
 int sctp_transport_init(knet_handle_t knet_h)
 {
 	int err = 0, savederrno = 0;
 	sctp_handle_info_t *handle_info;
 	struct epoll_event ev;
 
 	if (knet_h->transports[KNET_TRANSPORT_SCTP]) {
 		errno = EEXIST;
 		return -1;
 	}
 
 	handle_info = malloc(sizeof(sctp_handle_info_t));
 	if (!handle_info) {
 		return -1;
 	}
 
 	memset(handle_info, 0,sizeof(sctp_handle_info_t));
 
 	knet_h->transports[KNET_TRANSPORT_SCTP] = handle_info;
 
 	savederrno = _sctp_subscribe_init(knet_h);
 	if (savederrno) {
 		err = -1;
 		goto exit_fail;
 	}
 
 	qb_list_init(&handle_info->listen_links_list);
 	qb_list_init(&handle_info->connect_links_list);
 
 	handle_info->listen_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
 	if (handle_info->listen_epollfd < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll listen fd: %s",
 			strerror(savederrno));
 		goto exit_fail;
         }
 
 	if (_fdset_cloexec(handle_info->listen_epollfd)) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on listen_epollfd: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	handle_info->connect_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
         if (handle_info->connect_epollfd < 0) {
                 savederrno = errno;
 		err = -1;
                 log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll connect fd: %s",
                         strerror(savederrno));
                 goto exit_fail;
         }
 
 	if (_fdset_cloexec(handle_info->connect_epollfd)) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on connect_epollfd: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	if (_init_socketpair(knet_h, handle_info->connectsockfd) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init connect socketpair: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	memset(&ev, 0, sizeof(struct epoll_event));
 	ev.events = EPOLLIN;
 	ev.data.fd = handle_info->connectsockfd[0];
 	if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, handle_info->connectsockfd[0], &ev)) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connectsockfd[0] to connect epoll pool: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	if (_init_socketpair(knet_h, handle_info->listensockfd) < 0) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init listen socketpair: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	memset(&ev, 0, sizeof(struct epoll_event));
 	ev.events = EPOLLIN;
 	ev.data.fd = handle_info->listensockfd[0];
 	if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, handle_info->listensockfd[0], &ev)) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listensockfd[0] to listen epoll pool: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	/*
 	 * Start connect & listener threads
 	 */
 	set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_REGISTERED);
 	savederrno = pthread_create(&handle_info->listen_thread, 0, _sctp_listen_thread, (void *) knet_h);
 	if (savederrno) {
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp listen thread: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_REGISTERED);
 	savederrno = pthread_create(&handle_info->connect_thread, 0, _sctp_connect_thread, (void *) knet_h);
 	if (savederrno) {
 		err = -1;
 		log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp connect thread: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 exit_fail:
 	if (err < 0) {
 		sctp_transport_free(knet_h);
 	}
 	errno = savederrno;
 	return err;
 }
 
 int sctp_transport_link_dyn_connect(knet_handle_t knet_h, int sockfd, struct knet_link *kn_link)
 {
 	kn_link->outsock = sockfd;
 	kn_link->status.dynconnected = 1;
 	kn_link->transport_connected = 1;
 	return 0;
 }
 
 int sctp_transport_link_get_acl_fd(knet_handle_t knet_h, struct knet_link *kn_link)
 {
 	sctp_connect_link_info_t *this_link_info = kn_link->transport_link;
 	sctp_listen_link_info_t *info = this_link_info->listener;
 	return info->listen_sock;
 }
 #endif