diff --git a/configure.ac b/configure.ac
index 732d082a..f96161f4 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1,413 +1,418 @@
 #
 # Copyright (C) 2010-2015 Red Hat, Inc.  All rights reserved.
 #
 # Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
 #          Federico Simoncelli <fsimon@kronosnet.org>
 #
 # This software licensed under GPL-2.0+, LGPL-2.0+
 #
 
 #                                               -*- Autoconf -*-
 # Process this file with autoconf to produce a configure script.
 #
 
 AC_PREREQ([2.63])
 AC_INIT([kronosnet],
 	m4_esyscmd([build-aux/git-version-gen .tarball-version]),
 	[devel@lists.kronosnet.org])
 AC_USE_SYSTEM_EXTENSIONS
 AM_INIT_AUTOMAKE([1.11.1 dist-bzip2 dist-xz color-tests -Wno-portability subdir-objects])
 LT_PREREQ([2.2.6])
 LT_INIT
 
 AC_CONFIG_MACRO_DIR([m4])
 AC_CONFIG_SRCDIR([kronosnetd/main.c])
 AC_CONFIG_HEADERS([config.h])
 
 AC_CANONICAL_HOST
 AC_PROG_LIBTOOL
 
 AC_LANG([C])
 
 systemddir=${prefix}/lib/systemd/system
 
 if test "$prefix" = "NONE"; then
 	prefix="/usr"
 	if test "$localstatedir" = "\${prefix}/var"; then
 		localstatedir="/var"
 	fi
 	if test "$sysconfdir" = "\${prefix}/etc"; then
 		sysconfdir="/etc"
 	fi
 	if test "$systemddir" = "NONE/lib/systemd/system"; then
 		systemddir=/lib/systemd/system
 	fi
 	if test "$libdir" = "\${exec_prefix}/lib"; then
 		if test -e /usr/lib64; then
 			libdir="/usr/lib64"
 		else
 			libdir="/usr/lib"
 		fi
 	fi
 fi
 
 # Checks for programs.
 if ! ${MAKE-make} --version /cannot/make/this >/dev/null 2>&1; then
 	AC_MSG_ERROR(["you don't seem to have GNU make; it is required"])
 fi
 
 AC_PROG_AWK
 AC_PROG_GREP
 AC_PROG_SED
 AC_PROG_CPP
 AC_PROG_CC
 AC_PROG_CC_C99
 if test "x$ac_cv_prog_cc_c99" = "xno"; then
 	AC_MSG_ERROR(["C99 support is required"])
 fi
 AC_PROG_LN_S
 AC_PROG_INSTALL
 AC_PROG_MAKE_SET
 AC_PROG_CXX
 AC_PROG_RANLIB
 AC_CHECK_PROGS([PUBLICAN], [publican], [:])
 AC_CHECK_PROGS([PKGCONFIG], [pkg-config])
 
 AC_ARG_ENABLE([poc],
         [  --disable-poc                     : avoid building poc code ],,
         [ enable_poc="yes" ])
 AM_CONDITIONAL([BUILD_POC], [test x$enable_poc = xyes])
 
 AC_ARG_ENABLE([kronosnetd],
         [  --enable-kronosnetd               : Kronosnetd support ],,
         [ enable_kronosnetd="no" ])
 AM_CONDITIONAL([BUILD_KRONOSNETD], [test x$enable_kronosnetd = xyes])
 
 AC_ARG_ENABLE([libtap],
         [  --enable-libtap                   : libtap support ],,
         [ enable_libtap="no" ])
 
 if test "x$enable_kronosnetd" = xyes; then
    enable_libtap=yes
 fi
 AM_CONDITIONAL([BUILD_LIBTAP], [test x$enable_libtap = xyes])
 
 AC_ARG_ENABLE([libknet-sctp],
         [  --disable-libknet-sctp            : avoid libknet SCTP support ],,
         [ enable_libknet_sctp="yes" ])
 
 ## local helper functions
 # this function checks if CC support options passed as
 # args. Global CFLAGS are ignored during this test.
 cc_supports_flag() {
 	saveCPPFLAGS="$CPPFLAGS"
 	CPPFLAGS="$@"
 	if echo $CC | grep -q clang; then
 		CPPFLAGS="-Werror $CPPFLAGS"
 	fi
 	AC_MSG_CHECKING([whether $CC supports "$@"])
 	AC_PREPROC_IFELSE([AC_LANG_PROGRAM([])],
 			  [RC=0; AC_MSG_RESULT([yes])],
 			  [RC=1; AC_MSG_RESULT([no])])
 	CPPFLAGS="$saveCPPFLAGS"
 	return $RC
 }
 
 # helper macro to check libs without adding them to LIBS
 check_lib_no_libs() {
 	lib_no_libs_arg1=$1
 	shift
 	lib_no_libs_arg2=$1
 	shift
 	lib_no_libs_args=$@
 	AC_CHECK_LIB([$lib_no_libs_arg1],
 		     [$lib_no_libs_arg2],,,
 		     [$lib_no_libs_args])
         LIBS=$ac_check_lib_save_LIBS
 }
 
 # Checks for C features
 AC_C_INLINE
 
 # Checks for libraries.
 AC_CHECK_LIB([pthread], [pthread_create])
 AC_CHECK_LIB([m], [ceil])
 AC_CHECK_LIB([rt], [clock_gettime])
 
+# crypto libraries checks
 PKG_CHECK_MODULES([nss],[nss])
 
+# compress libraries checks
+# zlib is cheap because it's pulled in by nss
+PKG_CHECK_MODULES([zlib], [zlib])
+
 # Checks for header files.
 AC_CHECK_HEADERS([fcntl.h])
 AC_CHECK_HEADERS([stdlib.h])
 AC_CHECK_HEADERS([string.h])
 AC_CHECK_HEADERS([strings.h])
 AC_CHECK_HEADERS([sys/ioctl.h])
 AC_CHECK_HEADERS([syslog.h])
 AC_CHECK_HEADERS([unistd.h])
 AC_CHECK_HEADERS([netinet/in.h])
 AC_CHECK_HEADERS([sys/socket.h])
 AC_CHECK_HEADERS([arpa/inet.h])
 AC_CHECK_HEADERS([netdb.h])
 AC_CHECK_HEADERS([limits.h])
 AC_CHECK_HEADERS([stdint.h])
 AC_CHECK_HEADERS([sys/epoll.h])
 
 if test "x$enable_libknet_sctp" = xyes; then
 	AC_CHECK_HEADERS([netinet/sctp.h],, AC_MSG_ERROR(["missing required SCTP headers"]))
 fi
 
 # Checks for typedefs, structures, and compiler characteristics.
 AC_C_INLINE
 AC_TYPE_SIZE_T
 AC_TYPE_PID_T
 AC_TYPE_SSIZE_T
 AC_TYPE_UINT8_T
 AC_TYPE_UINT16_T
 AC_TYPE_UINT32_T
 AC_TYPE_UINT64_T
 AC_TYPE_INT32_T
 
 # Checks for library functions.
 AC_FUNC_ALLOCA
 AC_FUNC_FORK
 AC_FUNC_MALLOC
 AC_FUNC_REALLOC
 AC_CHECK_FUNCS([memset])
 AC_CHECK_FUNCS([strdup])
 AC_CHECK_FUNCS([strerror])
 AC_CHECK_FUNCS([dup2])
 AC_CHECK_FUNCS([select])
 AC_CHECK_FUNCS([socket])
 AC_CHECK_FUNCS([inet_ntoa])
 AC_CHECK_FUNCS([memmove])
 AC_CHECK_FUNCS([strchr])
 AC_CHECK_FUNCS([atexit])
 AC_CHECK_FUNCS([ftruncate])
 AC_CHECK_FUNCS([strrchr])
 AC_CHECK_FUNCS([strstr])
 AC_CHECK_FUNCS([clock_gettime])
 AC_CHECK_FUNCS([strcasecmp])
 AC_CHECK_FUNCS([kevent])
 # if neither sys/epoll.h nor kevent are present, we should fail.
 
 if test "x$ac_cv_header_sys_epoll_h" = xno && test "x$ac_cv_func_kevent" = xno; then
 	AC_MSG_ERROR([Both epoll and kevent unavailable on this OS])
 fi
 
 if test "x$ac_cv_header_sys_epoll_h" = xyes && test "x$ac_cv_func_kevent" = xyes; then
 	AC_MSG_ERROR([Both epoll and kevent available on this OS, please contact the maintainers to fix the code])
 fi
 
 # checks (for kronosnetd)
 if test "x$enable_kronosnetd" = xyes; then
 
 AC_CHECK_HEADERS([security/pam_appl.h],
 		 [AC_CHECK_LIB([pam], [pam_start])],
 		 [AC_MSG_ERROR([Unable to find LinuxPAM devel files])])
 
 AC_CHECK_HEADERS([security/pam_misc.h],
 	        [AC_CHECK_LIB([pam_misc], [misc_conv])],
                 [AC_MSG_ERROR([Unable to find LinuxPAM MISC devel files])])
 
 PKG_CHECK_MODULES([libqb], [libqb])
 
 AC_CHECK_LIB([qb], [qb_log_thread_priority_set],
 	     [have_qb_log_thread_priority_set="yes"],
              [have_qb_log_thread_priority_set="no"])
 
 if test "x${have_qb_log_thread_priority_set}" = xyes; then
 	AC_DEFINE_UNQUOTED([HAVE_QB_LOG_THREAD_PRIORITY_SET], [1], [have qb_log_thread_priority_set])
 fi
 
 fi
 
 # local options
 AC_ARG_ENABLE([debug],
 	[  --enable-debug          enable debug build. ],
 	[ default="no" ])
 
 AC_ARG_ENABLE([publicandocs],
 	[  --enable-publicandocs   enable docs build. ],
 	[ default="no" ])
 
 AC_ARG_WITH([initdefaultdir],
 	[  --with-initdefaultdir   : path to /etc/sysconfig/.. or /etc/default dir. ],
 	[ INITDEFAULTDIR="$withval" ],
 	[ INITDEFAULTDIR="$sysconfdir/default" ])
 
 AC_ARG_WITH([initddir],
 	[  --with-initddir=DIR     : path to init script directory. ],
 	[ INITDDIR="$withval" ],
 	[ INITDDIR="$sysconfdir/init.d" ])
 
 AC_ARG_WITH([systemddir],
 	[  --with-systemddir=DIR   : path to systemd unit files directory. ],
 	[ SYSTEMDDIR="$withval" ],
 	[ SYSTEMDDIR="$systemddir" ])
 
 AC_ARG_WITH([syslogfacility],
 	[  --with-syslogfacility=FACILITY
 			  default syslog facility. ],
 	[ SYSLOGFACILITY="$withval" ],
 	[ SYSLOGFACILITY="LOG_DAEMON" ])
 
 AC_ARG_WITH([sysloglevel],
 	[  --with-sysloglevel=LEVEL
 			  default syslog level. ],
 	[ SYSLOGLEVEL="$withval" ],
 	[ SYSLOGLEVEL="LOG_INFO" ])
 
 AC_ARG_WITH([defaultadmgroup],
 	[  --with-defaultadmgroup=GROUP
 			  define PAM group. Users part of this group will be
 			  allowed to configure kronosnet. Others will only
 			  receive read-only rights. ],
 	[ DEFAULTADMGROUP="$withval" ],
 	[ DEFAULTADMGROUP="kronosnetadm" ])
 
 ## random vars
 LOGDIR=${localstatedir}/log/
 RUNDIR=${localstatedir}/run/
 DEFAULT_CONFIG_DIR=${sysconfdir}/kronosnet
 
 ## do subst
 
 AM_CONDITIONAL([BUILD_DOCS], [test "x${enable_publicandocs}" = xyes])
 AM_CONDITIONAL([DEBUG], [test "x${enable_debug}" = xyes])
 
 AC_SUBST([DEFAULT_CONFIG_DIR])
 AC_SUBST([INITDEFAULTDIR])
 AC_SUBST([INITDDIR])
 AC_SUBST([SYSTEMDDIR])
 AC_SUBST([LOGDIR])
 AC_SUBST([DEFAULTADMGROUP])
 
 AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_DIR],
 		   ["$(eval echo ${DEFAULT_CONFIG_DIR})"],
 		   [Default config directory])
 
 AC_DEFINE_UNQUOTED([DEFAULT_CONFIG_FILE],
 		   ["$(eval echo ${DEFAULT_CONFIG_DIR}/kronosnetd.conf)"],
 		   [Default config file])
 
 AC_DEFINE_UNQUOTED([LOGDIR],
 		   ["$(eval echo ${LOGDIR})"],
 		   [Default logging directory])
 
 AC_DEFINE_UNQUOTED([DEFAULT_LOG_FILE],
 		   ["$(eval echo ${LOGDIR}/kronosnetd.log)"],
 		   [Default log file])
 
 AC_DEFINE_UNQUOTED([RUNDIR],
 		   ["$(eval echo ${RUNDIR})"],
 		   [Default run directory])
 
 AC_DEFINE_UNQUOTED([SYSLOGFACILITY],
 		   [$(eval echo ${SYSLOGFACILITY})],
 		   [Default syslog facility])
 
 AC_DEFINE_UNQUOTED([SYSLOGLEVEL],
 		   [$(eval echo ${SYSLOGLEVEL})],
 		   [Default syslog level])
 
 AC_DEFINE_UNQUOTED([DEFAULTADMGROUP],
 		   ["$(eval echo ${DEFAULTADMGROUP})"],
 		   [Default admin group])
 
 ## *FLAGS handling
 ENV_CFLAGS="$CFLAGS"
 ENV_CPPFLAGS="$CPPFLAGS"
 ENV_LDFLAGS="$LDFLAGS"
 
 # debug build stuff
 if test "x${enable_debug}" = xyes; then
 	AC_DEFINE_UNQUOTED([DEBUG], [1], [Compiling Debugging code])
 	OPT_CFLAGS="-O0"
 else
 	OPT_CFLAGS="-O3"
 fi
 
 # gdb flags
 if test "x${GCC}" = xyes; then
 	GDB_FLAGS="-ggdb3"
 else
 	GDB_FLAGS="-g"
 fi
 
 # extra warnings
 EXTRA_WARNINGS=""
 
 WARNLIST="
 	all
 	extra
 	unused
 	shadow
 	missing-prototypes
 	missing-declarations
 	suggest-attribute=noreturn
 	suggest-attribute=format
 	strict-prototypes
 	declaration-after-statement
 	pointer-arith
 	write-strings
 	cast-align
 	bad-function-cast
 	missing-format-attribute
 	float-equal
 	format=2
 	format-signedness
 	format-security
 	format-nonliteral
 	no-long-long
 	unsigned-char
 	gnu89-inline
 	no-strict-aliasing
 	error
 	address
 	cpp
 	overflow
 	parentheses
 	sequence-point
 	switch
 	shift-overflow=2
 	overlength-strings
 	retundent-decls
 	init-self
 	uninitialized
 	unused-but-set-variable
 	unused-function
 	unused-result
 	unused-value
 	unused-variable
 	unknown-pragmas
 	no-unused-parameter
 	"
 
 for j in $WARNLIST; do
 	if cc_supports_flag -W$j; then
 		EXTRA_WARNINGS="$EXTRA_WARNINGS -W$j";
 	fi
 done
 
 CFLAGS="$ENV_CFLAGS $lt_prog_compiler_pic $OPT_CFLAGS $GDB_FLAGS \
 	$EXTRA_WARNINGS $WERROR_CFLAGS"
 CPPFLAGS="$ENV_CPPFLAGS"
 LDFLAGS="$ENV_LDFLAGS $lt_prog_compiler_pic -Wl,--as-needed"
 
 AC_CONFIG_FILES([
 		Makefile
 		init/Makefile
 		libtap/Makefile
 		libtap/libtap.pc
 		kronosnetd/Makefile
 		kronosnetd/kronosnetd.logrotate
 		libknet/Makefile
 		libknet/libknet.pc
 		libknet/tests/Makefile
 		docs/Makefile
 		poc-code/Makefile
 		poc-code/iov-hash/Makefile
 		poc-code/access-list/Makefile
 		])
 
 AC_OUTPUT
diff --git a/libknet/Makefile.am b/libknet/Makefile.am
index 29719ec7..c8067f08 100644
--- a/libknet/Makefile.am
+++ b/libknet/Makefile.am
@@ -1,83 +1,88 @@
 #
 # Copyright (C) 2010-2015 Red Hat, Inc.  All rights reserved.
 #
 # Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
 #          Federico Simoncelli <fsimon@kronosnet.org>
 #
 # This software licensed under GPL-2.0+, LGPL-2.0+
 #
 
 MAINTAINERCLEANFILES	= Makefile.in
 
 include $(top_srcdir)/build-aux/check.mk
 
 SYMFILE			= libknet_exported_syms
 
 EXTRA_DIST		= $(SYMFILE)
 
 SUBDIRS			= . tests
 
 libversion		= 0:0:0
 
 # override global LIBS that pulls in lots of craft we don't need here
 LIBS			=
 
 sources			= \
 			  common.c \
 			  compat.c \
+			  compress.c \
+			  compress_zlib.c \
 			  crypto.c \
 			  handle.c \
 			  host.c \
 			  link.c \
 			  logging.c \
 			  netutils.c \
 			  nsscrypto.c \
 			  threads_common.c \
 			  threads_dsthandler.c \
 			  threads_heartbeat.c \
 			  threads_pmtud.c \
 			  threads_rx.c \
 			  threads_tx.c \
 			  transport_udp.c \
 			  transport_sctp.c \
 			  transport_loopback.c \
 			  transport_common.c
 
 include_HEADERS		= libknet.h
 
 pkgconfigdir		= $(libdir)/pkgconfig
 
 pkgconfig_DATA		= libknet.pc
 
 noinst_HEADERS		= \
 			  common.h \
 			  compat.h \
+			  compress.h \
+			  compress_zlib.h \
 			  crypto.h \
 			  host.h \
 			  internals.h \
 			  link.h \
 			  logging.h \
 			  netutils.h \
 			  nsscrypto.h \
 			  onwire.h \
 			  threads_common.h \
 			  threads_dsthandler.h \
 			  threads_heartbeat.h \
 			  threads_pmtud.h \
 			  threads_rx.h \
 			  threads_tx.h \
 			  transports.h
 
 lib_LTLIBRARIES		= libknet.la
 
 libknet_la_SOURCES	= $(sources)
 
-libknet_la_CFLAGS	= $(nss_CFLAGS)
+libknet_la_CFLAGS	= $(nss_CFLAGS) $(zlib_CFLAGS)
 
 EXTRA_libknet_la_DEPENDENCIES	= $(SYMFILE)
 
 libknet_la_LDFLAGS	= -Wl,--version-script=$(srcdir)/$(SYMFILE) \
 			  --export-dynamic \
 			  -version-number $(libversion)
 
-libknet_la_LIBADD	= $(nss_LIBS) -lrt -lpthread -lm
+libknet_la_LIBADD	= $(nss_LIBS) $(zlib_LIBS) \
+			  -lrt -lpthread -lm
diff --git a/libknet/compress.c b/libknet/compress.c
new file mode 100644
index 00000000..a12f9392
--- /dev/null
+++ b/libknet/compress.c
@@ -0,0 +1,123 @@
+/*
+ * Copyright (C) 2010-2017 Red Hat, Inc.  All rights reserved.
+ *
+ * Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#include "config.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+
+#include "internals.h"
+#include "compress.h"
+#include "logging.h"
+#include "compress_zlib.h"
+
+/*
+ * internal module switch data
+ */
+
+/*
+ * DO NOT CHANGE ORDER HERE OR ONWIRE COMPATIBILITY
+ * WILL BREAK!
+ *
+ * add after zlib and before NULL/NULL/NULL.
+ */
+
+compress_model_t compress_modules_cmds[] = {
+	{ "none", NULL, NULL, NULL },
+	{ "zlib", zlib_val_level, zlib_compress, zlib_decompress },
+        { NULL, NULL, NULL, NULL },
+};
+
+static int get_model(const char *model)
+{
+	int idx = 0;
+
+	while (compress_modules_cmds[idx].model_name != NULL) {
+		if (!strcmp(compress_modules_cmds[idx].model_name, model))
+			return idx;
+		idx++;
+	}
+	return -1;
+}
+
+static int get_max_model(void)
+{
+	int idx = 0;
+	while (compress_modules_cmds[idx].model_name != NULL) {
+		idx++;
+	}
+	return idx - 1;
+}
+
+static int val_level(
+	knet_handle_t knet_h,
+	int compress_model,
+	int compress_level)
+{
+	return compress_modules_cmds[compress_model].val_level(knet_h, compress_level);
+}
+
+int compress_init(
+	knet_handle_t knet_h,
+	struct knet_handle_compress_cfg *knet_handle_compress_cfg)
+{
+	int cmp_model;
+
+	knet_h->compress_max_model = get_max_model();
+	if (!knet_handle_compress_cfg) {
+		return 0;
+	}
+
+	log_debug(knet_h, KNET_SUB_COMPRESS,
+		  "Initizializing compress module [%s/%d]",
+		  knet_handle_compress_cfg->compress_model, knet_handle_compress_cfg->compress_level);
+
+	cmp_model = get_model(knet_handle_compress_cfg->compress_model);
+	if (cmp_model < 0) {
+		log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s not supported", knet_handle_compress_cfg->compress_model);
+		errno = EINVAL;
+		return -1;
+	}
+
+	if (cmp_model > 0) {
+		if (val_level(knet_h, cmp_model, knet_handle_compress_cfg->compress_level) < 0) {
+			log_err(knet_h, KNET_SUB_COMPRESS, "compress level %d not supported for model %s",
+				knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_model);
+			errno = EINVAL;
+			return -1;
+		}
+
+	}
+
+	knet_h->compress_model = cmp_model;
+	knet_h->compress_level = knet_handle_compress_cfg->compress_level;
+
+	return 0;
+}
+
+int compress(
+	knet_handle_t knet_h,
+	const unsigned char *buf_in,
+	const ssize_t buf_in_len,
+	unsigned char *buf_out,
+	ssize_t *buf_out_len)
+{
+	return compress_modules_cmds[knet_h->compress_model].compress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
+}
+
+int decompress(
+	knet_handle_t knet_h,
+	int compress_model,
+	const unsigned char *buf_in,
+	const ssize_t buf_in_len,
+	unsigned char *buf_out,
+	ssize_t *buf_out_len)
+{
+	return compress_modules_cmds[compress_model].decompress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
+}
diff --git a/libknet/compress.h b/libknet/compress.h
new file mode 100644
index 00000000..cb835396
--- /dev/null
+++ b/libknet/compress.h
@@ -0,0 +1,49 @@
+/*
+ * Copyright (C) 2010-2017 Red Hat, Inc.  All rights reserved.
+ *
+ * Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#ifndef __KNET_COMPRESS_H__
+#define __KNET_COMPRESS_H__
+
+#include "internals.h"
+
+typedef struct {
+	const char	*model_name;
+	int (*val_level)(knet_handle_t knet_h,
+			 int compress_level);
+	int (*compress)	(knet_handle_t knet_h,
+			 const unsigned char *buf_in,
+			 const ssize_t buf_in_len,
+			 unsigned char *buf_out,
+			 ssize_t *buf_out_len);
+	int (*decompress)(knet_handle_t knet_h,
+			 const unsigned char *buf_in,
+			 const ssize_t buf_in_len,
+			 unsigned char *buf_out,
+			 ssize_t *buf_out_len);
+} compress_model_t;
+
+int compress_init(
+	knet_handle_t knet_h,
+	struct knet_handle_compress_cfg *knet_handle_compress_cfg);
+
+int compress(
+	knet_handle_t knet_h,
+	const unsigned char *buf_in,
+	const ssize_t buf_in_len,
+	unsigned char *buf_out,
+	ssize_t *buf_out_len);
+
+int decompress(
+	knet_handle_t knet_h,
+	int compress_model,
+	const unsigned char *buf_in,
+	const ssize_t buf_in_len,
+	unsigned char *buf_out,
+	ssize_t *buf_out_len);
+
+#endif
diff --git a/libknet/compress_zlib.c b/libknet/compress_zlib.c
new file mode 100644
index 00000000..0256e081
--- /dev/null
+++ b/libknet/compress_zlib.c
@@ -0,0 +1,128 @@
+/*
+ * Copyright (C) 2010-2017 Red Hat, Inc.  All rights reserved.
+ *
+ * Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#include "config.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <zlib.h>
+
+#include "internals.h"
+#include "compress_zlib.h"
+#include "logging.h"
+
+int zlib_val_level(
+	knet_handle_t knet_h,
+	int compress_level)
+{
+	if (compress_level < 0) {
+		log_err(knet_h, KNET_SUB_ZLIBCOMP, "zlib does not support negative compression level %d",
+			 compress_level);
+		return -1;
+	}
+	if (compress_level > 9) {
+		log_err(knet_h, KNET_SUB_ZLIBCOMP, "zlib does not support compression level higher than 9");
+		return -1;
+	}
+	if (compress_level == 0) {
+		log_warn(knet_h, KNET_SUB_ZLIBCOMP, "zlib compress level 0 does NOT perform any compression");
+	}
+	return 0;
+}
+
+int zlib_compress(
+	knet_handle_t knet_h,
+	const unsigned char *buf_in,
+	const ssize_t buf_in_len,
+	unsigned char *buf_out,
+	ssize_t *buf_out_len)
+{
+	int zerr = 0, err = 0;
+	int savederrno = 0;
+	uLongf destLen = *buf_out_len;
+
+	zerr = compress2(buf_out, &destLen,
+			 buf_in, buf_in_len,
+			 knet_h->compress_level);
+
+	*buf_out_len = destLen;
+
+	switch(zerr) {
+		case Z_OK:
+			err = 0;
+			savederrno = 0;
+			break;
+		case Z_MEM_ERROR:
+			log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib compress mem error");
+			err = -1; 
+			savederrno = ENOMEM;
+			break;
+		case Z_BUF_ERROR:
+			log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib compress buf error");
+			err = -1;
+			savederrno = ENOBUFS;
+			break;
+		case Z_STREAM_ERROR:
+			log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib compress stream error");
+			err = -1;
+			savederrno = EINVAL;
+			break;
+		default:
+			log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib compress unknown error");
+			break;
+	}
+
+	errno = savederrno;
+	return err;
+}
+
+int zlib_decompress(
+	knet_handle_t knet_h,
+	const unsigned char *buf_in,
+	const ssize_t buf_in_len,
+	unsigned char *buf_out,
+	ssize_t *buf_out_len)
+{
+	int zerr = 0, err = 0;
+	int savederrno = 0;
+	uLongf destLen = *buf_out_len;
+
+	zerr = uncompress(buf_out, &destLen,
+			  buf_in, buf_in_len);
+
+	*buf_out_len = destLen;
+
+	switch(zerr) {
+		case Z_OK:
+			err = 0;
+			savederrno = 0;
+			break;
+		case Z_MEM_ERROR:
+			log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib decompress mem error");
+			err = -1;
+			savederrno = ENOMEM;
+			break;
+		case Z_BUF_ERROR:
+			log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib decompress buf error");
+			err = -1;
+			savederrno = ENOBUFS;
+			break;
+		case Z_DATA_ERROR:
+			log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib decompress data error");
+			err = -1;
+			savederrno = EINVAL;
+			break;
+		default:
+			log_debug(knet_h, KNET_SUB_ZLIBCOMP, "zlib unknown error");
+			break;
+	}
+
+	errno = savederrno;
+	return err;
+}
diff --git a/libknet/compress_zlib.h b/libknet/compress_zlib.h
new file mode 100644
index 00000000..4bd21017
--- /dev/null
+++ b/libknet/compress_zlib.h
@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2010-2017 Red Hat, Inc.  All rights reserved.
+ *
+ * Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#ifndef __KNET_COMPRESS_ZLIB_H__
+#define __KNET_COMPRESS_ZLIB H__
+
+#include "internals.h"
+
+int zlib_val_level(
+	knet_handle_t knet_h,
+	int compress_level);
+
+int zlib_compress(
+	knet_handle_t knet_h,
+	const unsigned char *buf_in,
+	const ssize_t buf_in_len,
+	unsigned char *buf_out,
+	ssize_t *buf_out_len);
+
+int zlib_decompress(
+	knet_handle_t knet_h,
+	const unsigned char *buf_in,
+	const ssize_t buf_in_len,
+	unsigned char *buf_out,
+	ssize_t *buf_out_len);
+
+#endif
diff --git a/libknet/handle.c b/libknet/handle.c
index e538b835..4a84a77c 100644
--- a/libknet/handle.c
+++ b/libknet/handle.c
@@ -1,1428 +1,1485 @@
 /*
  * Copyright (C) 2010-2015 Red Hat, Inc.  All rights reserved.
  *
  * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
  *          Federico Simoncelli <fsimon@kronosnet.org>
  *
  * This software licensed under GPL-2.0+, LGPL-2.0+
  */
 
 #include "config.h"
 
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
 #include <errno.h>
 #include <pthread.h>
 #include <sys/uio.h>
 #include <math.h>
 #include <sys/time.h>
 #include <sys/resource.h>
 
 #include "internals.h"
 #include "crypto.h"
+#include "compress.h"
 #include "compat.h"
 #include "common.h"
 #include "threads_common.h"
 #include "threads_heartbeat.h"
 #include "threads_pmtud.h"
 #include "threads_dsthandler.h"
 #include "threads_rx.h"
 #include "threads_tx.h"
 #include "transports.h"
 #include "logging.h"
 
 static pthread_mutex_t handle_config_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 static int _init_locks(knet_handle_t knet_h)
 {
 	int savederrno = 0;
 
 	savederrno = pthread_rwlock_init(&knet_h->global_rwlock, NULL);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize list rwlock: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	knet_h->lock_init_done = 1;
 
 	savederrno = pthread_mutex_init(&knet_h->pmtud_mutex, NULL);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud mutex: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	savederrno = pthread_cond_init(&knet_h->pmtud_cond, NULL);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize pmtud conditional mutex: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	savederrno = pthread_mutex_init(&knet_h->hb_mutex, NULL);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize hb_thread mutex: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	savederrno = pthread_mutex_init(&knet_h->tx_mutex, NULL);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_thread mutex: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	savederrno = pthread_mutex_init(&knet_h->tx_seq_num_mutex, NULL);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize tx_seq_num_mutex mutex: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	return 0;
 
 exit_fail:
 	errno = savederrno;
 	return -1;
 }
 
 static void _destroy_locks(knet_handle_t knet_h)
 {
 	knet_h->lock_init_done = 0;
 	pthread_rwlock_destroy(&knet_h->global_rwlock);
 	pthread_mutex_destroy(&knet_h->pmtud_mutex);
 	pthread_cond_destroy(&knet_h->pmtud_cond);
 	pthread_mutex_destroy(&knet_h->hb_mutex);
 	pthread_mutex_destroy(&knet_h->tx_mutex);
 	pthread_mutex_destroy(&knet_h->tx_seq_num_mutex);
 }
 
 static int _init_socks(knet_handle_t knet_h)
 {
 	int savederrno = 0;
 
 	if (_init_socketpair(knet_h, knet_h->hostsockfd)) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal hostsockpair: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	if (_init_socketpair(knet_h, knet_h->dstsockfd)) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize internal dstsockpair: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	return 0;
 
 exit_fail:
 	errno = savederrno;
 	return -1;
 }
 
 static void _close_socks(knet_handle_t knet_h)
 {
 	_close_socketpair(knet_h, knet_h->dstsockfd);
 	_close_socketpair(knet_h, knet_h->hostsockfd);
 }
 
 static int _init_buffers(knet_handle_t knet_h)
 {
 	int savederrno = 0;
 	int i;
 	size_t bufsize;
 
 	for (i = 0; i < PCKT_FRAG_MAX; i++) {
 		bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE;
 		knet_h->send_to_links_buf[i] = malloc(bufsize);
 		if (!knet_h->send_to_links_buf[i]) {
 			savederrno = errno;
 			log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory datafd to link buffer: %s",
 				strerror(savederrno));
 			goto exit_fail;
 		}
 		memset(knet_h->send_to_links_buf[i], 0, bufsize);
 	}
 
 	for (i = 0; i < PCKT_RX_BUFS; i++) {
 		knet_h->recv_from_links_buf[i] = malloc(KNET_DATABUFSIZE);
 		if (!knet_h->recv_from_links_buf[i]) {
 			savederrno = errno;
 			log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for link to datafd buffer: %s",
 				strerror(savederrno));
 			goto exit_fail;
 		}
 		memset(knet_h->recv_from_links_buf[i], 0, KNET_DATABUFSIZE);
 	}
 
 	knet_h->recv_from_sock_buf = malloc(KNET_DATABUFSIZE);
 	if (!knet_h->recv_from_sock_buf) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for app to datafd buffer: %s",
 				strerror(savederrno));
 		goto exit_fail;
 	}
 	memset(knet_h->recv_from_sock_buf, 0, KNET_DATABUFSIZE);
 
 	knet_h->pingbuf = malloc(KNET_HEADER_PING_SIZE);
 	if (!knet_h->pingbuf) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for hearbeat buffer: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 	memset(knet_h->pingbuf, 0, KNET_HEADER_PING_SIZE);
 
 	knet_h->pmtudbuf = malloc(KNET_PMTUD_SIZE_V6);
 	if (!knet_h->pmtudbuf) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for pmtud buffer: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 	memset(knet_h->pmtudbuf, 0, KNET_PMTUD_SIZE_V6);
 
 	for (i = 0; i < PCKT_FRAG_MAX; i++) {
 		bufsize = ceil((float)KNET_MAX_PACKET_SIZE / (i + 1)) + KNET_HEADER_ALL_SIZE + KNET_DATABUFSIZE_CRYPT_PAD;
 		knet_h->send_to_links_buf_crypt[i] = malloc(bufsize);
 		if (!knet_h->send_to_links_buf_crypt[i]) {
 			savederrno = errno;
 			log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto datafd to link buffer: %s",
 				strerror(savederrno));
 			goto exit_fail;
 		}
 		memset(knet_h->send_to_links_buf_crypt[i], 0, bufsize);
 	}
 
 	knet_h->recv_from_links_buf_decrypt = malloc(KNET_DATABUFSIZE_CRYPT);
 	if (!knet_h->recv_from_links_buf_decrypt) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 	memset(knet_h->recv_from_links_buf_decrypt, 0, KNET_DATABUFSIZE_CRYPT);
 
 	knet_h->recv_from_links_buf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
 	if (!knet_h->recv_from_links_buf_crypt) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto link to datafd buffer: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 	memset(knet_h->recv_from_links_buf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
 
 	knet_h->pingbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
 	if (!knet_h->pingbuf_crypt) {
 		savederrno = errno; 
 		log_err(knet_h, KNET_SUB_CRYPTO, "Unable to allocate memory for crypto hearbeat buffer: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 	memset(knet_h->pingbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
 
 	knet_h->pmtudbuf_crypt = malloc(KNET_DATABUFSIZE_CRYPT);
 	if (!knet_h->pmtudbuf_crypt) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for crypto pmtud buffer: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 	memset(knet_h->pmtudbuf_crypt, 0, KNET_DATABUFSIZE_CRYPT);
 
+	knet_h->recv_from_links_buf_decompress = malloc(KNET_DATABUFSIZE_COMPRESS);
+	if (!knet_h->recv_from_links_buf_decompress) {
+		savederrno = errno;
+		log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for decompress buffer: %s",
+			strerror(savederrno));
+		goto exit_fail;
+	}
+	memset(knet_h->recv_from_links_buf_decompress, 0, KNET_DATABUFSIZE_COMPRESS);
+
+	knet_h->send_to_links_buf_compress = malloc(KNET_DATABUFSIZE_COMPRESS);
+	if (!knet_h->send_to_links_buf_compress) {
+		savederrno = errno;
+		log_err(knet_h, KNET_SUB_HANDLE, "Unable to allocate memory for compress buffer: %s",
+			strerror(savederrno));
+		goto exit_fail;
+	}
+	memset(knet_h->send_to_links_buf_compress, 0, KNET_DATABUFSIZE_COMPRESS);
+
 	memset(knet_h->knet_transport_fd_tracker, KNET_MAX_TRANSPORTS, sizeof(knet_h->knet_transport_fd_tracker));
 
 	return 0;
 
 exit_fail:
 	errno = savederrno;
 	return -1;
 }
 
 static void _destroy_buffers(knet_handle_t knet_h)
 {
 	int i;
 
 	for (i = 0; i < PCKT_FRAG_MAX; i++) {
 		free(knet_h->send_to_links_buf[i]);
 		free(knet_h->send_to_links_buf_crypt[i]);
 	}
 
 	for (i = 0; i < PCKT_RX_BUFS; i++) {
 		free(knet_h->recv_from_links_buf[i]);
 	}
 
+	free(knet_h->recv_from_links_buf_decompress);
+	free(knet_h->send_to_links_buf_compress);
 	free(knet_h->recv_from_sock_buf);
 	free(knet_h->recv_from_links_buf_decrypt);
 	free(knet_h->recv_from_links_buf_crypt);
 	free(knet_h->pingbuf);
 	free(knet_h->pingbuf_crypt);
 	free(knet_h->pmtudbuf);
 	free(knet_h->pmtudbuf_crypt);
 }
 
 static int _init_epolls(knet_handle_t knet_h)
 {
 	struct epoll_event ev;
 	int savederrno = 0;
 
 	/*
 	 * even if the kernel does dynamic allocation with epoll_ctl
 	 * we need to reserve one extra for host to host communication
 	 */
 	knet_h->send_to_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
 	if (knet_h->send_to_links_epollfd < 0) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll datafd to link fd: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	knet_h->recv_from_links_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
 	if (knet_h->recv_from_links_epollfd < 0) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll link to datafd fd: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	knet_h->dst_link_handler_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS);
 	if (knet_h->dst_link_handler_epollfd < 0) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to create epoll dst cache fd: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	if (_fdset_cloexec(knet_h->send_to_links_epollfd)) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd to link epoll fd: %s",
 			strerror(savederrno)); 
 		goto exit_fail;
 	}
 
 	if (_fdset_cloexec(knet_h->recv_from_links_epollfd)) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on link to datafd epoll fd: %s",
 			strerror(savederrno)); 
 		goto exit_fail;
 	}
 
 	if (_fdset_cloexec(knet_h->dst_link_handler_epollfd)) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on dst cache epoll fd: %s",
 			strerror(savederrno)); 
 		goto exit_fail;
 	}
 
 	memset(&ev, 0, sizeof(struct epoll_event));
 	ev.events = EPOLLIN;
 	ev.data.fd = knet_h->hostsockfd[0];
 
 	if (epoll_ctl(knet_h->send_to_links_epollfd,
 		      EPOLL_CTL_ADD, knet_h->hostsockfd[0], &ev)) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to add hostsockfd[0] to epoll pool: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	memset(&ev, 0, sizeof(struct epoll_event));
 	ev.events = EPOLLIN;
 	ev.data.fd = knet_h->dstsockfd[0];
 
 	if (epoll_ctl(knet_h->dst_link_handler_epollfd,
 		      EPOLL_CTL_ADD, knet_h->dstsockfd[0], &ev)) {
 		savederrno = errno;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to add dstsockfd[0] to epoll pool: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	return 0;
 
 exit_fail:
 	errno = savederrno;
 	return -1;
 }
 
 static void _close_epolls(knet_handle_t knet_h)
 {
 	struct epoll_event ev;
 	int i;
 
 	memset(&ev, 0, sizeof(struct epoll_event));
 
 	for (i = 0; i < KNET_DATAFD_MAX; i++) {
 		if (knet_h->sockfd[i].in_use) {
 			epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created], &ev);
 			if  (knet_h->sockfd[i].sockfd[knet_h->sockfd[i].is_created]) {
 				 _close_socketpair(knet_h, knet_h->sockfd[i].sockfd);
 			}
 		}
 	}
 
 	epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->hostsockfd[0], &ev);
 	epoll_ctl(knet_h->dst_link_handler_epollfd, EPOLL_CTL_DEL, knet_h->dstsockfd[0], &ev);
 	close(knet_h->send_to_links_epollfd);
 	close(knet_h->recv_from_links_epollfd);
 	close(knet_h->dst_link_handler_epollfd);
 }
 
 static int _start_transports(knet_handle_t knet_h)
 {
 	int i, savederrno = 0, err = 0;
 
 	for (i=0; i<KNET_MAX_TRANSPORTS; i++) {
 		switch (i) {
 			case KNET_TRANSPORT_UDP:
 				knet_h->transport_ops[i] = get_udp_transport();
 				break;
 			case KNET_TRANSPORT_SCTP:
 				knet_h->transport_ops[i] = get_sctp_transport();
 				break;
 			case KNET_TRANSPORT_LOOPBACK:
 				knet_h->transport_ops[i] = get_loopback_transport();
 				break;
 		}
 
 		if (knet_h->transport_ops[i]) {
 			if (knet_h->transport_ops[i]->transport_init(knet_h) < 0) {
 				savederrno = errno;
 				log_err(knet_h, KNET_SUB_HANDLE, "Failed to allocate transport handle for %s: %s",
 					knet_h->transport_ops[i]->transport_name,
 					strerror(savederrno));
 				err = -1;
 				goto out;
 			}
 		}
 	}
 
 out:
 	errno = savederrno;
 	return err;
 }
 
 static void _stop_transports(knet_handle_t knet_h)
 {
 	int i;
 
 	for (i=0; i<KNET_MAX_TRANSPORTS; i++) {
 		if (knet_h->transport_ops[i]) {
 			knet_h->transport_ops[i]->transport_free(knet_h);
 		}
 	}
 }
 
 static int _start_threads(knet_handle_t knet_h)
 {
 	int savederrno = 0;
 
 	savederrno = pthread_create(&knet_h->pmtud_link_handler_thread, 0,
 				    _handle_pmtud_link_thread, (void *) knet_h);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to start pmtud link thread: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	savederrno = pthread_create(&knet_h->dst_link_handler_thread, 0,
 				    _handle_dst_link_handler_thread, (void *) knet_h);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to start dst cache thread: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	savederrno = pthread_create(&knet_h->send_to_links_thread, 0,
 				    _handle_send_to_links_thread, (void *) knet_h);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to start datafd to link thread: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	savederrno = pthread_create(&knet_h->recv_from_links_thread, 0,
 				    _handle_recv_from_links_thread, (void *) knet_h);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to start link to datafd thread: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 
 	savederrno = pthread_create(&knet_h->heartbt_thread, 0,
 				    _handle_heartbt_thread, (void *) knet_h);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to start heartbeat thread: %s",
 			strerror(savederrno));
 		goto exit_fail;
 	}
 	return 0;
 
 exit_fail:
 	errno = savederrno;
 	return -1;
 }
 
 static void _stop_threads(knet_handle_t knet_h)
 {
 	void *retval;
 
 	/*
 	 * allow threads to catch on shutdown request
 	 * and release locks before we stop them.
 	 * this isn't the most efficent way to handle it
 	 * but it works good enough for now
 	 */
 
 	sleep(1);
 
 	if (knet_h->heartbt_thread) {
 		pthread_cancel(knet_h->heartbt_thread);
 		pthread_join(knet_h->heartbt_thread, &retval);
 	}
 
 	if (knet_h->send_to_links_thread) {
 		pthread_cancel(knet_h->send_to_links_thread);
 		pthread_join(knet_h->send_to_links_thread, &retval);
 	}
 
 	if (knet_h->recv_from_links_thread) {
 		pthread_cancel(knet_h->recv_from_links_thread);
 		pthread_join(knet_h->recv_from_links_thread, &retval);
 	}
 
 	if (knet_h->dst_link_handler_thread) {
 		pthread_cancel(knet_h->dst_link_handler_thread);
 		pthread_join(knet_h->dst_link_handler_thread, &retval);
 	}
 
 	pthread_mutex_lock(&knet_h->pmtud_mutex);
 	pthread_cond_signal(&knet_h->pmtud_cond);
 	pthread_mutex_unlock(&knet_h->pmtud_mutex);
 
 	sleep(1);
 
 	if (knet_h->pmtud_link_handler_thread) {
 		pthread_cancel(knet_h->pmtud_link_handler_thread);
 		pthread_join(knet_h->pmtud_link_handler_thread, &retval);
 	}
 }
 
 knet_handle_t knet_handle_new(knet_node_id_t host_id,
 			      int	     log_fd,
 			      uint8_t	     default_log_level)
 {
 	knet_handle_t knet_h;
 	int savederrno = 0;
 	struct rlimit cur;
 
 	if (getrlimit(RLIMIT_NOFILE, &cur) < 0) {
 		return NULL;
 	}
 
 	if ((log_fd < 0) || ((unsigned int)log_fd >= cur.rlim_max)) {
 		errno = EINVAL;
 		return NULL;
 	}
 
 	/*
 	 * validate incoming request
 	 */
 
 	if ((log_fd) && (default_log_level > KNET_LOG_DEBUG)) {
 		errno = EINVAL;
 		return NULL;
 	}
 
 	/*
 	 * allocate handle
 	 */
 
 	knet_h = malloc(sizeof(struct knet_handle));
 	if (!knet_h) {
 		errno = ENOMEM;
 		return NULL;
 	}
 	memset(knet_h, 0, sizeof(struct knet_handle));
 
 	savederrno = pthread_mutex_lock(&handle_config_mutex);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		goto exit_fail;
 	}
 
 	/*
 	 * copy config in place
 	 */
 
 	knet_h->host_id = host_id;
 	knet_h->logfd = log_fd;
 	if (knet_h->logfd > 0) {
 		memset(&knet_h->log_levels, default_log_level, KNET_MAX_SUBSYSTEMS);
 	}
 
 	/*
 	 * set pmtud default timers
 	 */
 	knet_h->pmtud_interval = KNET_PMTUD_DEFAULT_INTERVAL;
 
 	/*
 	 * set transports reconnect default timers
 	 */
 	knet_h->reconnect_int = KNET_TRANSPORT_DEFAULT_RECONNECT_INTERVAL;
 
 	/*
 	 * init main locking structures
 	 */
 
 	if (_init_locks(knet_h)) {
 		savederrno = errno;
 		goto exit_fail;
 	}
 
 	/*
 	 * init sockets
 	 */
 
 	if (_init_socks(knet_h)) {
 		savederrno = errno;
 		goto exit_fail;
 	}
 
 	/*
 	 * allocate packet buffers
 	 */
 
 	if (_init_buffers(knet_h)) {
 		savederrno = errno;
 		goto exit_fail;
 	}
 
+	if (compress_init(knet_h, NULL)) {
+		savederrno = EINVAL;
+		goto exit_fail;
+	}
+
 	/*
 	 * create epoll fds
 	 */
 
 	if (_init_epolls(knet_h)) {
 		savederrno = errno;
 		goto exit_fail;
 	}
 
 	/*
 	 * start transports
 	 */
 
 	if (_start_transports(knet_h)) {
 		savederrno = errno;
 		goto exit_fail;
 	}
 
 	/*
 	 * start internal threads
 	 */
 
 	if (_start_threads(knet_h)) {
 		savederrno = errno;
 		goto exit_fail;
 	}
 
 	pthread_mutex_unlock(&handle_config_mutex);
 	return knet_h;
 
 exit_fail:
 	pthread_mutex_unlock(&handle_config_mutex);
 	knet_handle_free(knet_h);
 	errno = savederrno;
 	return NULL;
 }
 
 int knet_handle_free(knet_handle_t knet_h)
 {
 	int savederrno = 0;
 
 	savederrno = pthread_mutex_lock(&handle_config_mutex);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get handle mutex lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	if (!knet_h) {
 		pthread_mutex_unlock(&handle_config_mutex);
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (!knet_h->lock_init_done) {
 		goto exit_nolock;
 	}
 
 	savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
 			strerror(savederrno));
 		pthread_mutex_unlock(&handle_config_mutex);
 		errno = savederrno;
 		return -1;
 	}
 
 	if (knet_h->host_head != NULL) {
 		savederrno = EBUSY;
 		log_err(knet_h, KNET_SUB_HANDLE,
 			"Unable to free handle: host(s) or listener(s) are still active: %s",
 			strerror(savederrno));
 		pthread_rwlock_unlock(&knet_h->global_rwlock);
 		pthread_mutex_unlock(&handle_config_mutex);
 		errno = savederrno;
 		return -1;
 	}
 
 	knet_h->fini_in_progress = 1;
 
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 
 	_stop_threads(knet_h);
 	_stop_transports(knet_h);
 	_close_epolls(knet_h);
 	_destroy_buffers(knet_h);
 	_close_socks(knet_h);
 	crypto_fini(knet_h);
 
 	_destroy_locks(knet_h);
 
 exit_nolock:
 	free(knet_h);
 	knet_h = NULL;
 	pthread_mutex_unlock(&handle_config_mutex);
 	return 0;
 }
 
 int knet_handle_enable_sock_notify(knet_handle_t knet_h,
 				   void *sock_notify_fn_private_data,
 				   void (*sock_notify_fn) (
 						void *private_data,
 						int datafd,
 						int8_t channel,
 						uint8_t tx_rx,
 						int error,
 						int errorno))
 {
 	int savederrno = 0, err = 0;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (!sock_notify_fn) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data;
 	knet_h->sock_notify_fn = sock_notify_fn;
 	log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled");
 
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 
 	return err;
 }
 
 int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel)
 {
 	int err = 0, savederrno = 0;
 	int i;
 	struct epoll_event ev;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (datafd == NULL) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel == NULL) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (*channel >= KNET_DATAFD_MAX) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	if (!knet_h->sock_notify_fn) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!");
 		savederrno = EINVAL;
 		err = -1;
 		goto out_unlock;
 	}
 
 	if (*datafd > 0) {
 		for (i = 0; i < KNET_DATAFD_MAX; i++) {
 			if  ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) {
 				log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i);
 				savederrno = EEXIST;
 				err = -1;
 				goto out_unlock;
 			}
 		}
 	}
 
 	/*
 	 * auto allocate a channel
 	 */
 	if (*channel < 0) {
 		for (i = 0; i < KNET_DATAFD_MAX; i++) {
 			if (!knet_h->sockfd[i].in_use) {
 				*channel = i;
 				break;
 			}
 		}
 		if (*channel < 0) {
 			savederrno = EBUSY;
 			err = -1;
 			goto out_unlock;
 		}
 	} else {
 		if (knet_h->sockfd[*channel].in_use) {
 			savederrno = EBUSY;
 			err = -1;
 			goto out_unlock;
 		}
 	}
 
 	knet_h->sockfd[*channel].is_created = 0;
 	knet_h->sockfd[*channel].is_socket = 0;
 	knet_h->sockfd[*channel].has_error = 0;
 
 	if (*datafd > 0) {
 		int sockopt;
 		socklen_t sockoptlen = sizeof(sockopt);
 
 		if (_fdset_cloexec(*datafd)) {
 			savederrno = errno;
 			err = -1;
 			log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s",
 				strerror(savederrno));
 			goto out_unlock;
 		}
 
 		if (_fdset_nonblock(*datafd)) {
 			savederrno = errno;
 			err = -1;
 			log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s",
 				strerror(savederrno));
 			goto out_unlock;
 		}
 
 		knet_h->sockfd[*channel].sockfd[0] = *datafd;
 		knet_h->sockfd[*channel].sockfd[1] = 0;
 
 		if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) {
 			knet_h->sockfd[*channel].is_socket = 1;
 		}
 	} else {
 		if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) {
 			savederrno = errno;
 			err = -1;
 			goto out_unlock;
 		}
 
 		knet_h->sockfd[*channel].is_created = 1;
 		knet_h->sockfd[*channel].is_socket = 1;
 		*datafd = knet_h->sockfd[*channel].sockfd[0];
 	}
 
 	memset(&ev, 0, sizeof(struct epoll_event));
 	ev.events = EPOLLIN;
 	ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created];
 
 	if (epoll_ctl(knet_h->send_to_links_epollfd,
 		      EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) {
 		savederrno = errno;
 		err = -1;
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s",
 			knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno));
 		if (knet_h->sockfd[*channel].is_created) {
 			_close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd);
 		}
 		goto out_unlock;
 	}
 
 	knet_h->sockfd[*channel].in_use = 1;
 
 out_unlock:
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 	errno = savederrno;
 	return err;
 }
 
 int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd)
 {
 	int err = 0, savederrno = 0;
 	int8_t channel = -1;
 	int i;
 	struct epoll_event ev;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (datafd <= 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	for (i = 0; i < KNET_DATAFD_MAX; i++) {
 		if  ((knet_h->sockfd[i].in_use) &&
 		     (knet_h->sockfd[i].sockfd[0] == datafd)) {
 			channel = i;
 			break;
 		}
 	}
 
 	if (channel < 0) {
 		savederrno = EINVAL;
 		err = -1;
 		goto out_unlock;
 	}
 
 	if (!knet_h->sockfd[channel].has_error) {
 		memset(&ev, 0, sizeof(struct epoll_event));
 
 		if (epoll_ctl(knet_h->send_to_links_epollfd,
 			      EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
 			savederrno = errno;
 			err = -1;
 			log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s",
 				knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
 			goto out_unlock;
 		}
 	}
 
 	if (knet_h->sockfd[channel].is_created) {
 		_close_socketpair(knet_h, knet_h->sockfd[channel].sockfd);
 	}
 
 	memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock));
 
 out_unlock:
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 	errno = savederrno;
 	return err;
 }
 
 int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd)
 {
 	int err = 0, savederrno = 0;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (datafd == NULL) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	if (!knet_h->sockfd[channel].in_use) {
 		savederrno = EINVAL;
 		err = -1;
 		goto out_unlock;
 	}
 
 	*datafd = knet_h->sockfd[channel].sockfd[0];
 
 out_unlock:
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 	errno = savederrno;
 	return err;
 }
 
 int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel)
 {
 	int err = 0, savederrno = 0;
 	int i;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (datafd <= 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel == NULL) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	*channel = -1;
 
 	for (i = 0; i < KNET_DATAFD_MAX; i++) {
 		if  ((knet_h->sockfd[i].in_use) &&
 		     (knet_h->sockfd[i].sockfd[0] == datafd)) {
 			*channel = i;
 			break;
 		}
 	}
 
 	if (*channel < 0) {
 		savederrno = EINVAL;
 		err = -1;
 		goto out_unlock;
 	}
 
 out_unlock:
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 	errno = savederrno;
 	return err;
 }
 
 int knet_handle_enable_filter(knet_handle_t knet_h,
 			      void *dst_host_filter_fn_private_data,
 			      int (*dst_host_filter_fn) (
 					void *private_data,
 					const unsigned char *outdata,
 					ssize_t outdata_len,
 					uint8_t tx_rx,
 					knet_node_id_t this_host_id,
 					knet_node_id_t src_node_id,
 					int8_t *channel,
 					knet_node_id_t *dst_host_ids,
 					size_t *dst_host_ids_entries))
 {
 	int savederrno = 0;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data;
 	knet_h->dst_host_filter_fn = dst_host_filter_fn;
 	if (knet_h->dst_host_filter_fn) {
 		log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled");
 	} else {
 		log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled");
 	}
 
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 
 	return 0;
 }
 
 int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled)
 {
 	int savederrno = 0;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (enabled > 1) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	knet_h->enabled = enabled;
 
 	if (enabled) {
 		log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
 	} else {
 		log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
 	}
 
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 
 	return 0;
 }
 
 int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval)
 {
 	int savederrno = 0;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (!interval) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	*interval = knet_h->pmtud_interval;
 
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 
 	return 0;
 }
 
 int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval)
 {
 	int savederrno = 0;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if ((!interval) || (interval > 86400)) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	knet_h->pmtud_interval = interval;
 	log_debug(knet_h, KNET_SUB_HANDLE, "PMTUd interval set to: %u seconds", interval);
 
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 
 	return 0;
 }
 
 int knet_handle_enable_pmtud_notify(knet_handle_t knet_h,
 				    void *pmtud_notify_fn_private_data,
 				    void (*pmtud_notify_fn) (
 						void *private_data,
 						unsigned int data_mtu))
 {
 	int savederrno = 0;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	knet_h->pmtud_notify_fn_private_data = pmtud_notify_fn_private_data;
 	knet_h->pmtud_notify_fn = pmtud_notify_fn;
 	if (knet_h->pmtud_notify_fn) {
 		log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn enabled");
 	} else {
 		log_debug(knet_h, KNET_SUB_HANDLE, "pmtud_notify_fn disabled");
 	}
 
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 
 	return 0;
 }
 
 int knet_handle_pmtud_get(knet_handle_t knet_h,
 			  unsigned int *data_mtu)
 {
 	int savederrno = 0;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (!data_mtu) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	*data_mtu = knet_h->data_mtu;
 
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 
 	return 0;
 }
 
 int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg)
 {
 	int savederrno = 0;
 	int err = 0;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (!knet_handle_crypto_cfg) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	crypto_fini(knet_h);
 
 	if ((!strncmp("none", knet_handle_crypto_cfg->crypto_model, 4)) || 
 	    ((!strncmp("none", knet_handle_crypto_cfg->crypto_cipher_type, 4)) &&
 	     (!strncmp("none", knet_handle_crypto_cfg->crypto_hash_type, 4)))) {
 		log_debug(knet_h, KNET_SUB_CRYPTO, "crypto is not enabled");
 		err = 0;
 		goto exit_unlock;
 	}
 
 	if (knet_handle_crypto_cfg->private_key_len < KNET_MIN_KEY_LEN) {
 		log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too short (min %d): %u",
 			  KNET_MIN_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
 		savederrno = EINVAL;
 		err = -1;
 		goto exit_unlock;
 	}
 
 	if (knet_handle_crypto_cfg->private_key_len > KNET_MAX_KEY_LEN) {
 		log_debug(knet_h, KNET_SUB_CRYPTO, "private key len too long (max %d): %u",
 			  KNET_MAX_KEY_LEN, knet_handle_crypto_cfg->private_key_len);
 		savederrno = EINVAL;
 		err = -1;
 		goto exit_unlock;
 	}
 
 	err = crypto_init(knet_h, knet_handle_crypto_cfg);
 
 	if (err) {
 		err = -2;
 	}
 
 exit_unlock:
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 	errno = savederrno;
 	return err;
 }
 
+int knet_handle_compress(knet_handle_t knet_h, struct knet_handle_compress_cfg *knet_handle_compress_cfg)
+{
+	int savederrno = 0;
+	int err = 0;
+
+	if (!knet_h) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	if (!knet_handle_compress_cfg) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
+	if (savederrno) {
+		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
+			strerror(savederrno));
+		errno = savederrno;
+		return -1;
+	}
+
+	err = compress_init(knet_h, knet_handle_compress_cfg);
+	savederrno = errno;
+
+	pthread_rwlock_unlock(&knet_h->global_rwlock);
+	errno = savederrno;
+	return err;
+}
+
 ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
 {
 	int savederrno = 0;
 	ssize_t err = 0;
 	struct iovec iov_in;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff == NULL) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff_len <= 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff_len > KNET_MAX_PACKET_SIZE) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel < 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel >= KNET_DATAFD_MAX) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	if (!knet_h->sockfd[channel].in_use) {
 		savederrno = EINVAL;
 		err = -1;
 		goto out_unlock;
 	}
 
 	memset(&iov_in, 0, sizeof(iov_in));
 	iov_in.iov_base = (void *)buff;
 	iov_in.iov_len = buff_len;
 
 	err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
 	savederrno = errno;
 
 out_unlock:
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 	errno = savederrno;
 	return err;
 }
 
 ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
 {
 	int savederrno = 0;
 	ssize_t err = 0;
 	struct iovec iov_out[1];
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff == NULL) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff_len <= 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff_len > KNET_MAX_PACKET_SIZE) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel < 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel >= KNET_DATAFD_MAX) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	if (!knet_h->sockfd[channel].in_use) {
 		savederrno = EINVAL;
 		err = -1;
 		goto out_unlock;
 	}
 
 	memset(iov_out, 0, sizeof(iov_out));
 
 	iov_out[0].iov_base = (void *)buff;
 	iov_out[0].iov_len = buff_len;
 
 	err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
 	savederrno = errno;
 
 out_unlock:
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 	errno = savederrno;
 	return err;
 }
diff --git a/libknet/internals.h b/libknet/internals.h
index 8f330dde..129c39a3 100644
--- a/libknet/internals.h
+++ b/libknet/internals.h
@@ -1,472 +1,481 @@
 /*
  * Copyright (C) 2010-2015 Red Hat, Inc.  All rights reserved.
  *
  * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
  *          Federico Simoncelli <fsimon@kronosnet.org>
  *
  * This software licensed under GPL-2.0+, LGPL-2.0+
  */
 
 #ifndef __KNET_INTERNALS_H__
 #define __KNET_INTERNALS_H__
 
 /*
  * NOTE: you shouldn't need to include this header normally
  */
 
 #include "libknet.h"
 #include "onwire.h"
 #include "compat.h"
 
 #define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE
+
 #define KNET_DATABUFSIZE_CRYPT_PAD 1024
 #define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD
 
+#define KNET_DATABUFSIZE_COMPRESS_PAD 1024
+#define KNET_DATABUFSIZE_COMPRESS KNET_DATABUFSIZE + KNET_DATABUFSIZE_COMPRESS_PAD
+
 #define KNET_RING_RCVBUFF 8388608
 
 #define PCKT_FRAG_MAX UINT8_MAX
 #define PCKT_RX_BUFS  512
 
 #define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX
 
 typedef void *knet_transport_link_t; /* per link transport handle */
 typedef void *knet_transport_t;      /* per knet_h transport handle */
 struct  knet_transport_ops;          /* Forward because of circular dependancy */
 
 struct knet_mmsghdr {
 	struct msghdr msg_hdr;	/* Message header */
 	unsigned int  msg_len;	/* Number of bytes transmitted */
 };
 
 struct knet_link {
 	/* required */
 	struct sockaddr_storage src_addr;
 	struct sockaddr_storage dst_addr;
 	/* configurable */
 	unsigned int dynamic;			/* see KNET_LINK_DYN_ define above */
 	uint8_t  priority;			/* higher priority == preferred for A/P */
 	unsigned long long ping_interval;	/* interval */
 	unsigned long long pong_timeout;	/* timeout */
 	unsigned int latency_fix;		/* precision */
 	uint8_t pong_count;			/* how many ping/pong to send/receive before link is up */
 	uint64_t flags;
 	/* status */
 	struct knet_link_status status;
 	/* internals */
 	uint8_t link_id;
 	uint8_t transport_type;                 /* #defined constant from API */
 	knet_transport_link_t transport_link;   /* link_info_t from transport */
 	int outsock;
 	unsigned int configured:1;		/* set to 1 if src/dst have been configured transport initialized on this link*/
 	unsigned int transport_connected:1;	/* set to 1 if lower level transport is connected */
 	unsigned int latency_exp;
 	uint8_t received_pong;
 	struct timespec ping_last;
 	/* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */
 	uint32_t proto_overhead;
 	struct timespec pmtud_last;
 	uint32_t last_ping_size;
 	uint32_t last_good_mtu;
 	uint32_t last_bad_mtu;
 	uint32_t last_sent_mtu;
 	uint32_t last_recv_mtu;
 	uint8_t has_valid_mtu;
 };
 
 #define KNET_CBUFFER_SIZE 4096
 
 struct knet_host_defrag_buf {
 	char buf[KNET_DATABUFSIZE];
 	uint8_t in_use;			/* 0 buffer is free, 1 is in use */
 	seq_num_t pckt_seq;		/* identify the pckt we are receiving */
 	uint8_t frag_recv;		/* how many frags did we receive */
 	uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */
 	uint8_t	last_first;		/* special case if we receive the last fragment first */
 	uint16_t frag_size;		/* normal frag size (not the last one) */
 	uint16_t last_frag_size;	/* the last fragment might not be aligned with MTU size */
 	struct timespec last_update;	/* keep time of the last pckt */
 };
 
 struct knet_host {
 	/* required */
 	knet_node_id_t host_id;
 	/* configurable */
 	uint8_t link_handler_policy;
 	char name[KNET_MAX_HOST_LEN];
 	/* status */
 	struct knet_host_status status;
 	/* internals */
 	char circular_buffer[KNET_CBUFFER_SIZE];
 	seq_num_t rx_seq_num;
 	seq_num_t untimed_rx_seq_num;
 	seq_num_t timed_rx_seq_num;
 	uint8_t got_data;
 	/* defrag/reassembly buffers */
 	struct knet_host_defrag_buf defrag_buf[KNET_MAX_LINK];
 	char circular_buffer_defrag[KNET_CBUFFER_SIZE];
 	/* link stuff */
 	struct knet_link link[KNET_MAX_LINK];
 	uint8_t active_link_entries;
 	uint8_t active_links[KNET_MAX_LINK];
 	struct knet_host *next;
 };
 
 struct knet_sock {
 	int sockfd[2];   /* sockfd[0] will always be application facing
 			  * and sockfd[1] internal if sockpair has been created by knet */
 	int is_socket;   /* check if it's a socket for recvmmsg usage */
 	int is_created;  /* knet created this socket and has to clean up on exit/del */
 	int in_use;      /* set to 1 if it's use, 0 if free */
 	int has_error;   /* set to 1 if there were errors reading from the sock
 			  * and socket has been removed from epoll */
 };
 
 struct knet_fd_trackers {
 	uint8_t transport; /* transport type (UDP/SCTP...) */
 	uint8_t data_type; /* internal use for transport to define what data are associated
 			    * to this fd */
 	void *data;	   /* pointer to the data */
 };
 
 #define KNET_MAX_FDS KNET_MAX_HOST * KNET_MAX_LINK * 4
 
 struct knet_handle {
 	knet_node_id_t host_id;
 	unsigned int enabled:1;
 	struct knet_sock sockfd[KNET_DATAFD_MAX];
 	int logfd;
 	uint8_t log_levels[KNET_MAX_SUBSYSTEMS];
 	int hostsockfd[2];
 	int dstsockfd[2];
 	int send_to_links_epollfd;
 	int recv_from_links_epollfd;
 	int dst_link_handler_epollfd;
 	unsigned int pmtud_interval;
 	unsigned int data_mtu;	/* contains the max data size that we can send onwire
 				 * without frags */
 	struct knet_host *host_head;
 	struct knet_host *host_index[KNET_MAX_HOST];
 	knet_transport_t transports[KNET_MAX_TRANSPORTS+1];
 	struct knet_transport_ops *transport_ops[KNET_MAX_TRANSPORTS+1];
 	struct knet_fd_trackers knet_transport_fd_tracker[KNET_MAX_FDS]; /* track status for each fd handled by transports */
 	uint32_t reconnect_int;
 	knet_node_id_t host_ids[KNET_MAX_HOST];
 	size_t host_ids_entries;
 	struct knet_header *recv_from_sock_buf;
 	struct knet_header *send_to_links_buf[PCKT_FRAG_MAX];
 	struct knet_header *recv_from_links_buf[PCKT_RX_BUFS];
 	struct knet_header *pingbuf;
 	struct knet_header *pmtudbuf;
 	pthread_t send_to_links_thread;
 	pthread_t recv_from_links_thread;
 	pthread_t heartbt_thread;
 	pthread_t dst_link_handler_thread;
 	pthread_t pmtud_link_handler_thread;
 	int lock_init_done;
 	pthread_rwlock_t global_rwlock;		/* global config lock */
 	pthread_mutex_t pmtud_mutex;		/* pmtud mutex to handle conditional send/recv + timeout */
 	pthread_cond_t pmtud_cond;		/* conditional for above */
 	pthread_mutex_t tx_mutex;		/* used to protect knet_send_sync and TX thread */
 	pthread_mutex_t hb_mutex;		/* used to protect heartbeat thread and seq_num broadcasting */
 	struct crypto_instance *crypto_instance;
 	uint16_t sec_header_size;
 	uint16_t sec_block_size;
 	uint16_t sec_hash_size;
 	uint16_t sec_salt_size;
 	unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX];
 	unsigned char *recv_from_links_buf_crypt;
 	unsigned char *recv_from_links_buf_decrypt;
 	unsigned char *pingbuf_crypt;
 	unsigned char *pmtudbuf_crypt;
+	int compress_model;
+	int compress_max_model;
+	int compress_level;
+	unsigned char *recv_from_links_buf_decompress;
+	unsigned char *send_to_links_buf_compress;
 	seq_num_t tx_seq_num;
 	pthread_mutex_t tx_seq_num_mutex;
 	uint8_t has_loop_link;
 	uint8_t loop_link;
 	void *dst_host_filter_fn_private_data;
 	int (*dst_host_filter_fn) (
 		void *private_data,
 		const unsigned char *outdata,
 		ssize_t outdata_len,
 		uint8_t tx_rx,
 		knet_node_id_t this_host_id,
 		knet_node_id_t src_node_id,
 		int8_t *channel,
 		knet_node_id_t *dst_host_ids,
 		size_t *dst_host_ids_entries);
 	void *pmtud_notify_fn_private_data;
 	void (*pmtud_notify_fn) (
 		void *private_data,
 		unsigned int data_mtu);
 	void *host_status_change_notify_fn_private_data;
 	void (*host_status_change_notify_fn) (
 		void *private_data,
 		knet_node_id_t host_id,
 		uint8_t reachable,
 		uint8_t remote,
 		uint8_t external);
 	void *sock_notify_fn_private_data;
 	void (*sock_notify_fn) (
 		void *private_data,
 		int datafd,
 		int8_t channel,
 		uint8_t tx_rx,
 		int error,
 		int errorno);
 	int fini_in_progress;
 };
 
 /*
  * NOTE: every single operation must be implementend
  *       for every protocol.
  */
 
 typedef struct knet_transport_ops {
 /*
  * transport generic information
  */
 	const char *transport_name;
 	const uint8_t transport_id;
 	uint32_t transport_mtu_overhead;
 /*
  * transport init must allocate the new transport
  * and perform all internal initializations
  * (threads, lists, etc).
  */
 	int (*transport_init)(knet_handle_t knet_h);
 /*
  * transport free must releases _all_ resources
  * allocated by tranport_init
  */
 	int (*transport_free)(knet_handle_t knet_h);
 
 /*
  * link operations should take care of all the
  * sockets and epoll management for a given link/transport set
  * transport_link_disable should return err = -1 and errno = EBUSY
  * if listener is still in use, and any other errno in case
  * the link cannot be disabled.
  *
  * set_config/clear_config are invoked in global write lock context
  */
 	int (*transport_link_set_config)(knet_handle_t knet_h, struct knet_link *link);
 	int (*transport_link_clear_config)(knet_handle_t knet_h, struct knet_link *link);
 
 /*
  * transport callback for incoming dynamic connections
  * this is called in global read lock context
  */
 	int (*transport_link_dyn_connect)(knet_handle_t knet_h, int sockfd, struct knet_link *link);
 
 /*
  * per transport error handling of recvmmsg
  * (see _handle_recv_from_links comments for details)
  */
 
 /*
  * transport_rx_sock_error is invoked when recvmmsg returns <= 0
  *
  * transport_rx_sock_error is invoked with both global_rdlock
  */
 
 	int (*transport_rx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno);
 
 /*
  * transport_tx_sock_error is invoked with global_rwlock and
  * it's invoked when sendto or sendmmsg returns =< 0
  *
  * it should return:
  * -1 on internal error
  *  0 ignore error and continue
  *  1 retry
  *    any sleep or wait action should happen inside the transport code
  */
 	int (*transport_tx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno);
 
 /*
  * this function is called on _every_ received packet
  * to verify if the packet is data or internal protocol error handling
  *
  * it should return:
  * -1 on error
  *  0 packet is not data and we should continue the packet process loop
  *  1 packet is not data and we should STOP the packet process loop
  *  2 packet is data and should be parsed as such
  *
  * transport_rx_is_data is invoked with both global_rwlock
  * and fd_tracker read lock (from RX thread)
  */
 	int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg);
 } knet_transport_ops_t;
 
 socklen_t sockaddr_len(const struct sockaddr_storage *ss);
 
 /**
  * This is a kernel style list implementation.
  *
  * @author Steven Dake <sdake@redhat.com>
  */
 
 struct knet_list_head {
 	struct knet_list_head *next;
 	struct knet_list_head *prev;
 };
 
 /**
  * @def KNET_LIST_DECLARE()
  * Declare and initialize a list head.
  */
 #define KNET_LIST_DECLARE(name) \
     struct knet_list_head name = { &(name), &(name) }
 
 #define KNET_INIT_LIST_HEAD(ptr) do { \
 	(ptr)->next = (ptr); (ptr)->prev = (ptr); \
 } while (0)
 
 /**
  * Initialize the list entry.
  *
  * Points next and prev pointers to head.
  * @param head pointer to the list head
  */
 static inline void knet_list_init(struct knet_list_head *head)
 {
 	head->next = head;
 	head->prev = head;
 }
 
 /**
  * Add this element to the list.
  *
  * @param element the new element to insert.
  * @param head pointer to the list head
  */
 static inline void knet_list_add(struct knet_list_head *element,
 			       struct knet_list_head *head)
 {
 	head->next->prev = element;
 	element->next = head->next;
 	element->prev = head;
 	head->next = element;
 }
 
 /**
  * Add to the list (but at the end of the list).
  *
  * @param element pointer to the element to add
  * @param head pointer to the list head
  * @see knet_list_add()
  */
 static inline void knet_list_add_tail(struct knet_list_head *element,
 				    struct knet_list_head *head)
 {
 	head->prev->next = element;
 	element->next = head;
 	element->prev = head->prev;
 	head->prev = element;
 }
 
 /**
  * Delete an entry from the list.
  *
  * @param _remove the list item to remove
  */
 static inline void knet_list_del(struct knet_list_head *_remove)
 {
 	_remove->next->prev = _remove->prev;
 	_remove->prev->next = _remove->next;
 }
 
 /**
  * Replace old entry by new one
  * @param old: the element to be replaced
  * @param new: the new element to insert
  */
 static inline void knet_list_replace(struct knet_list_head *old,
 		struct knet_list_head *new)
 {
 	new->next = old->next;
 	new->next->prev = new;
 	new->prev = old->prev;
 	new->prev->next = new;
 }
 
 /**
  * Tests whether list is the last entry in list head
  * @param list: the entry to test
  * @param head: the head of the list
  * @return boolean true/false
  */
 static inline int knet_list_is_last(const struct knet_list_head *list,
 		const struct knet_list_head *head)
 {
 	return list->next == head;
 }
 
 /**
  * A quick test to see if the list is empty (pointing to it's self).
  * @param head pointer to the list head
  * @return boolean true/false
  */
 static inline int32_t knet_list_empty(const struct knet_list_head *head)
 {
 	return head->next == head;
 }
 
 
 /**
  * Get the struct for this entry
  * @param ptr:	the &struct list_head pointer.
  * @param type:	the type of the struct this is embedded in.
  * @param member:	the name of the list_struct within the struct.
  */
 #define knet_list_entry(ptr,type,member)\
 	((type *)((char *)(ptr)-(char*)(&((type *)0)->member)))
 
 /**
  * Get the first element from a list
  * @param ptr:	the &struct list_head pointer.
  * @param type:	the type of the struct this is embedded in.
  * @param member:	the name of the list_struct within the struct.
  */
 #define knet_list_first_entry(ptr, type, member) \
 	knet_list_entry((ptr)->next, type, member)
 
 /**
  * Iterate over a list
  * @param pos:	the &struct list_head to use as a loop counter.
  * @param head:	the head for your list.
  */
 #define knet_list_for_each(pos, head) \
 	for (pos = (head)->next; pos != (head); pos = pos->next)
 
 /**
  * Iterate over a list backwards
  * @param pos:	the &struct list_head to use as a loop counter.
  * @param head:	the head for your list.
  */
 #define knet_list_for_each_reverse(pos, head) \
 	for (pos = (head)->prev; pos != (head); pos = pos->prev)
 
 /**
  * Iterate over a list safe against removal of list entry
  * @param pos:	the &struct list_head to use as a loop counter.
  * @param n:		another &struct list_head to use as temporary storage
  * @param head:	the head for your list.
  */
 #define knet_list_for_each_safe(pos, n, head) \
 	for (pos = (head)->next, n = pos->next; pos != (head); \
 		pos = n, n = pos->next)
 
 /**
  * Iterate over list of given type
  * @param pos:	the type * to use as a loop counter.
  * @param head:	the head for your list.
  * @param member:	the name of the list_struct within the struct.
  */
 #define knet_list_for_each_entry(pos, head, member)			\
 	for (pos = knet_list_entry((head)->next, typeof(*pos), member);	\
 	     &pos->member != (head);					\
 	     pos = knet_list_entry(pos->member.next, typeof(*pos), member))
 
 
 #endif
diff --git a/libknet/libknet.h b/libknet/libknet.h
index 9b5a8973..a5842a10 100644
--- a/libknet/libknet.h
+++ b/libknet/libknet.h
@@ -1,1585 +1,1627 @@
 /*
  * Copyright (C) 2010-2015 Red Hat, Inc.  All rights reserved.
  *
  * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
  *          Federico Simoncelli <fsimon@kronosnet.org>
  *
  * This software licensed under GPL-2.0+, LGPL-2.0+
  */
 
 #ifndef __LIBKNET_H__
 #define __LIBKNET_H__
 
 #include <stdint.h>
 #include <time.h>
 #include <netinet/in.h>
 
 /*
  * libknet limits
  */
 
 /*
  * Maximum number of hosts
  */
 
 typedef uint16_t knet_node_id_t;
 #define KNET_MAX_HOST 65536
 
 /*
  * Maximum number of links between 2 hosts
  */
 
 #define KNET_MAX_LINK 8
 
 /*
  * Maximum packet size that should be written to datafd
  *  see knet_handle_new for details
  */
 
 #define KNET_MAX_PACKET_SIZE 65536
 
 /*
  * Buffers used for pretty logging
  *  host is used to store both ip addresses and hostnames
  */
 
 #define KNET_MAX_HOST_LEN 256
 #define KNET_MAX_PORT_LEN 6
 
 /*
  * Some notifications can be generated either on TX or RX
  */
 
 #define KNET_NOTIFY_TX 0
 #define KNET_NOTIFY_RX 1
 
 
 /*
  * Link flags
  */
 
 /*
  * Where possible, set traffic priority to high.
  * On Linux this sets the TOS to INTERACTIVE (6),
  * see tc-prio(8) for more infomation
  */
 #define KNET_LINK_FLAG_TRAFFICHIPRIO (1ULL << 0)
 
 
 typedef struct knet_handle *knet_handle_t;
 
 /*
  * Handle structs/API calls
  */
 
 /*
  * knet_handle_new
  *
  * host_id  - Each host in a knet is identified with a unique
  *            ID. when creating a new handle local host_id
  *            must be specified (0 to UINT16T_MAX are all valid).
  *            It is the user's responsibility to check that the value
  *            is unique, or bad things might happen.
  *
  * log_fd   - Write file descriptor. If set to a value > 0, it will be used
  *            to write log packets (see below) from libknet to the application.
  *            Setting to 0 will disable logging from libknet.
  *            It is possible to enable logging at any given time (see logging API
  *            below).
  *            Make sure to either read from this filedescriptor properly and/or
  *            mark it O_NONBLOCK, otherwise if the fd becomes full, libknet could
  *            block.
  *
  * default_log_level -
  *            If logfd is specified, it will initialize all subsystems to log
  *            at default_log_level value. (see logging API below)
  *
  * on success, a new knet_handle_t is returned.
  * on failure, NULL is returned and errno is set.
  */
 
 knet_handle_t knet_handle_new(knet_node_id_t host_id,
 			      int      log_fd,
 			      uint8_t  default_log_level);
 
 /*
  * knet_handle_free
  *
  * knet_h   - pointer to knet_handle_t
  *
  * Destroy a knet handle, free all resources
  *
  * knet_handle_free returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_handle_free(knet_handle_t knet_h);
 
 /*
  * knet_handle_enable_sock_notify
  *
  * knet_h   - pointer to knet_handle_t
  *
  * sock_notify_fn_private_data
  *            void pointer to data that can be used to identify
  *            the callback.
  *
  * sock_notify_fn
  *            A callback function that is invoked every time
  *            a socket in the datafd pool will report an error (-1)
  *            or an end of read (0) (see socket.7).
  *            This function MUST NEVER block or add substantial delays.
  *            The callback is invoked in an internal unlocked area
  *            to allow calls to knet_handle_add_datafd/knet_handle_remove_datafd
  *            to swap/replace the bad fd.
  *            if both err and errno are 0, it means that the socket
  *            has received a 0 byte packet (EOF?).
  *            The callback function must either remove the fd from knet
  *            (by calling knet_handle_remove_fd()) or dup a new fd in its place.
  *            Failure to do this can cause problems.
  *
  * knet_handle_enable_sock_notify returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_handle_enable_sock_notify(knet_handle_t knet_h,
 				   void *sock_notify_fn_private_data,
 				   void (*sock_notify_fn) (
 						void *private_data,
 						int datafd,
 						int8_t channel,
 						uint8_t tx_rx,
 						int error,
 						int errorno)); /* sorry! can't call it errno ;) */
 
 /*
  * knet_handle_add_datafd
  *
  * IMPORTANT: In order to add datafd to knet, knet_handle_enable_sock_notify
  *            _MUST_ be set and be able to handle both errors (-1) and
  *            0 bytes read / write from the provided datafd.
  *            On read error (< 0) from datafd, the socket is automatically
  *            removed from polling to avoid spinning on dead sockets.
  *            It is safe to call knet_handle_remove_datafd even on sockets
  *            that have been removed.
  *
  * knet_h   - pointer to knet_handle_t
  *
  * *datafd  - read/write file descriptor.
  *            knet will read data here to send to the other hosts
  *            and will write data received from the network.
  *            Each data packet can be of max size KNET_MAX_PACKET_SIZE!
  *            Applications using knet_send/knet_recv will receive a
  *            proper error if the packet size is not within boundaries.
  *            Applications using their own functions to write to the
  *            datafd should NOT write more than KNET_MAX_PACKET_SIZE.
  *
  *            Please refer to handle.c on how to set up a socketpair.
  *
  *            datafd can be 0, and knet_handle_add_datafd will create a properly
  *            populated socket pair the same way as ping_test, or a value
  *            higher than 0. A negative number will return an error.
  *            On exit knet_handle_free will take care to cleanup the
  *            socketpair only if they have been created by knet_handle_add_datafd.
  *
  *            It is possible to pass either sockets or normal fds.
  *            User provided datafd will be marked as non-blocking and close-on-exit.
  *
  * *channel - This value has the same effect of VLAN tagging.
  *            A negative value will auto-allocate a channel.
  *            Setting a value between 0 and 31 will try to allocate that
  *            specific channel (unless already in use).
  *
  *            It is possible to add up to 32 datafds but be aware that each
  *            one of them must have a receiving end on the other host.
  *
  *            Example:
  *            hostA channel 0 will be delivered to datafd on hostB channel 0
  *            hostA channel 1 to hostB channel 1.
  *
  *            Each channel must have a unique file descriptor.
  *
  *            If your application could have 2 channels on one host and one
  *            channel on another host, then you can use dst_host_filter
  *            to manipulate channel values on TX and RX.
  *
  * knet_handle_add_datafd returns:
  *
  * 0 on success
  *   *datafd  will be populated with a socket if the original value was 0
  *            or if a specific fd was set, the value is untouched.
  *   *channel will be populated with a channel number if the original value
  *            was negative or the value is untouched if a specific channel
  *            was requested.
  *
  * -1 on error and errno is set.
  *   *datafd and *channel are untouched or empty.
  */
 
 #define KNET_DATAFD_MAX 32
 
 int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel);
 
 /*
  * knet_handle_remove_datafd
  *
  * knet_h   - pointer to knet_handle_t
  *
  * datafd   - file descriptor to remove.
  *            NOTE that if the socket/fd was created by knet_handle_add_datafd,
  *                 the socket will be closed by libknet.
  *
  * knet_handle_remove_datafd returns:
  *
  * 0 on success
  *
  * -1 on error and errno is set.
  */
 
 int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd);
 
 /*
  * knet_handle_get_channel
  *
  * knet_h  - pointer to knet_handle_t
  *
  * datafd  - get the channel associated to this datafd
  *
  * *channel - will contain the result
  *
  * knet_handle_get_channel returns:
  *
  * 0 on success
  *   and *channel will contain the result
  *
  * -1 on error and errno is set.
  *   and *channel content is meaningless
  */
 
 int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel);
 
 /*
  * knet_handle_get_datafd
  *
  * knet_h   - pointer to knet_handle_t
  *
  * channel  - get the datafd associated to this channel
  *
  * *datafd  - will contain the result
  *
  * knet_handle_get_datafd returns:
  *
  * 0 on success
  *   and *datafd will contain the results
  *
  * -1 on error and errno is set.
  *   and *datafd content is meaningless
  */
 
 int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd);
 
 /*
  * knet_recv
  *
  * knet_h   - pointer to knet_handle_t
  *
  * buff     - pointer to buffer to store the received data
  *
  * buff_len - buffer lenght
  *
  * knet_recv is a commodity function to wrap iovec operations
  * around a socket. It returns a call to readv(2).
  */
 
 ssize_t knet_recv(knet_handle_t knet_h,
 		  char *buff,
 		  const size_t buff_len,
 		  const int8_t channel);
 
 /*
  * knet_send
  *
  * knet_h   - pointer to knet_handle_t
  *
  * buff     - pointer to the buffer of data to send
  *
  * buff_len - length of data to send
  *
  * knet_send is a commodity function to wrap iovec operations
  * around a socket. It returns a call to writev(2).
  */
 
 ssize_t knet_send(knet_handle_t knet_h,
 		  const char *buff,
 		  const size_t buff_len,
 		  const int8_t channel);
 
 /*
  * knet_send_sync
  *
  * knet_h   - pointer to knet_handle_t
  *
  * buff     - pointer to the buffer of data to send
  *
  * buff_len - length of data to send
  *
  * channel  - data channel to use (see knet_handle_add_datafd)
  *
  * All knet RX/TX operations are async for performance reasons.
  * There are applications that might need a sync version of data
  * transmission and receive errors in case of failure to deliver
  * to another host.
  * knet_send_sync bypasses the whole TX async layer and delivers
  * data directly to the link layer, and returns errors accordingly.
  * knet_send_sync allows to send only one packet to one host at
  * a time. It does NOT support multiple destinations or multicast
  * packets. Decision is still based on dst_host_filter_fn.
  *
  * knet_send_sync returns 0 on success and -1 on error.
  *
  * In addition to normal sendmmsg errors, knet_send_sync can fail
  * due to:
  *
  * ECANCELED - data forward is disabled
  * EFAULT    - dst_host_filter fatal error
  * EINVAL    - dst_host_filter did not provide
  *             dst_host_ids_entries on unicast pckts
  * E2BIG     - dst_host_filter did return more than one
  *             dst_host_ids_entries on unicast pckts
  * ENOMSG    - received unknown message type
  * EHOSTDOWN - unicast pckt cannot be delivered because
  *             dest host is not connected yet
  * ECHILD    - crypto failed
  * EAGAIN    - sendmmsg was unable to send all messages and
  *             there was no progress during retry
  */
 
 int knet_send_sync(knet_handle_t knet_h,
 		   const char *buff,
 		   const size_t buff_len,
 		   const int8_t channel);
 
 /*
  * knet_handle_enable_filter
  *
  * knet_h   - pointer to knet_handle_t
  *
  * dst_host_filter_fn_private_data
  *            void pointer to data that can be used to identify
  *            the callback.
  *
  * dst_host_filter_fn -
  *            is a callback function that is invoked every time
  *            a packet hits datafd (see knet_handle_new).
  *            the function allows users to tell libknet where the
  *            packet has to be delivered.
  *
  *            const unsigned char *outdata - is a pointer to the
  *                                           current packet
  *            ssize_t outdata_len          - lenght of the above data
  *            uint8_t tx_rx                - filter is called on tx or rx
  *                                           (see defines below)
  *            knet_node_id_t this_host_id  - host_id processing the packet
  *            knet_node_id_t src_host_id   - host_id that generated the
  *                                           packet
  *            knet_node_id_t *dst_host_ids - array of KNET_MAX_HOST knet_node_id_t
  *                                           where to store the destinations
  *            size_t *dst_host_ids_entries - number of hosts to send the message
  *
  * dst_host_filter_fn should return
  * -1 on error, packet is discarded.
  *  0 packet is unicast and should be sent to dst_host_ids and there are
  *    dst_host_ids_entries in the buffer.
  *  1 packet is broadcast/multicast and is sent all hosts.
  *    contents of dst_host_ids and dst_host_ids_entries are ignored.
  *  (see also kronosnetd/etherfilter.* for an example that filters based
  *   on ether protocol)
  *
  * knet_handle_enable_filter returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_handle_enable_filter(knet_handle_t knet_h,
 			      void *dst_host_filter_fn_private_data,
 			      int (*dst_host_filter_fn) (
 					void *private_data,
 					const unsigned char *outdata,
 					ssize_t outdata_len,
 					uint8_t tx_rx,
 					knet_node_id_t this_host_id,
 					knet_node_id_t src_host_id,
 					int8_t *channel,
 					knet_node_id_t *dst_host_ids,
 					size_t *dst_host_ids_entries));
 
 /*
  * knet_handle_setfwd
  *
  * knet_h   - pointer to knet_handle_t
  *
  * enable   - set to 1 to allow data forwarding, 0 to disable data forwarding.
  *
  * knet_handle_setfwd returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  *
  * By default data forwarding is off and no traffic will pass through knet until
  * it is set on.
  */
 
 int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled);
 
 /*
  * knet_handle_pmtud_setfreq
  *
  * knet_h   - pointer to knet_handle_t
  *
  * interval - define the interval in seconds between PMTUd scans
  *            range from 1 to 86400 (24h)
  *
  * knet_handle_pmtud_setfreq returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  *
  * default interval is 60.
  */
 
 #define KNET_PMTUD_DEFAULT_INTERVAL 60
 
 int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval);
 
 /*
  * knet_handle_pmtud_getfreq
  *
  * knet_h   - pointer to knet_handle_t
  *
  * interval - pointer where to store the current interval value
  *
  * knet_handle_pmtud_setfreq returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 
 int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval);
 
 /*
  * knet_handle_enable_pmtud_notify
  *
  * knet_h   - pointer to knet_handle_t
  *
  * pmtud_notify_fn_private_data
  *            void pointer to data that can be used to identify
  *            the callback.
  *
  * pmtud_notify_fn
  *            is a callback function that is invoked every time
  *            a path MTU size change is detected.
  *            The function allows libknet to notify the user
  *            of data MTU, that's the max value that can be send
  *            onwire without fragmentation. The data MTU will always
  *            be lower than real link MTU because it accounts for
  *            protocol overhead, knet packet header and (if configured)
  *            crypto overhead,
  *            This function MUST NEVER block or add substantial delays.
  *
  * knet_handle_enable_pmtud_notify returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_handle_enable_pmtud_notify(knet_handle_t knet_h,
 			      void *pmtud_notify_fn_private_data,
 			      void (*pmtud_notify_fn) (
 					void *private_data,
 					unsigned int data_mtu));
 
 /*
  * knet_handle_pmtud_get
  *
  * knet_h   - pointer to knet_handle_t
  *
  * data_mtu - pointer where to store data_mtu (see above)
  *
  * knet_handle_pmtud_get returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_handle_pmtud_get(knet_handle_t knet_h,
 				unsigned int *data_mtu);
 
 /*
  * knet_handle_crypto
  *
  * knet_h   - pointer to knet_handle_t
  *
  * knet_handle_crypto_cfg -
  *            pointer to a knet_handle_crypto_cfg structure
  *
  *            crypto_model should contain the model name.
  *                         Currently only "nss" is supported.
  *                         Setting to "none" will disable crypto.
  *
  *            crypto_cipher_type
  *                         should contain the cipher algo name.
  *                         It can be set to "none" to disable
  *                         encryption.
  *                         Currently supported by "nss" model:
  *                         "3des", "aes128", "aes192" and "aes256".
  *
  *            crypto_hash_type
  *                         should contain the hashing algo name.
  *                         It can be set to "none" to disable
  *                         hashing.
  *                         Currently supported by "nss" model:
  *                         "md5", "sha1", "sha256", "sha384" and "sha512".
  *
  *            private_key  will contain the private shared key.
  *                         It has to be at least KNET_MIN_KEY_LEN long.
  *
  *            private_key_len
  *                         length of the provided private_key.
  *
  * Implementation notes/current limitations:
  * - enabling crypto, will increase latency as packets have
  *   to processed.
  * - enabling crypto might reduce the overall throughtput
  *   due to crypto data overhead.
  * - re-keying is not implemented yet.
  * - private/public key encryption/hashing is not currently
  *   planned.
  * - crypto key must be the same for all hosts in the same
  *   knet instance.
  * - it is safe to call knet_handle_crypto multiple times at runtime.
  *   The last config will be used.
  *   IMPORTANT: a call to knet_handle_crypto can fail due to:
  *              1) failure to obtain locking
  *              2) errors to initializing the crypto level.
  *   This can happen even in subsequent calls to knet_handle_crypto.
  *   A failure in crypto init, might leave your traffic unencrypted!
  *   It's best to stop data forwarding (see above), change crypto config,
  *   start forward again.
  *
  * knet_handle_crypto returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  * -2 on crypto subsystem initialization error. No errno is provided at the moment (yet).
  */
 
 #define KNET_MIN_KEY_LEN  256
 #define KNET_MAX_KEY_LEN 4096
 
 struct knet_handle_crypto_cfg {
 	char		crypto_model[16];
 	char		crypto_cipher_type[16];
 	char		crypto_hash_type[16];
 	unsigned char	private_key[KNET_MAX_KEY_LEN];
 	unsigned int	private_key_len;
 };
 
 int knet_handle_crypto(knet_handle_t knet_h,
 		       struct knet_handle_crypto_cfg *knet_handle_crypto_cfg);
 
+/*
+ * knet_handle_compress
+ *
+ * knet_h   - pointer to knet_handle_t
+ *
+ * knet_handle_compress_cfg -
+ *            pointer to a knet_handle_compress_cfg structure
+ *
+ *            compress_model should contain the mode name.
+ *                           Currently only "zlib" is supported.
+ *                           Setting to "none" will disable compress.
+ *
+ *            compress_level many compression libraries adopted
+ *                           the standard values ranging from 0 for
+ *                           fast compression to 9 for high compression.
+ *                           Please refere to the library man pages
+ *                           on how to be set this value, as it is passed
+ *                           unmodified to the compression algorithm.
+ *
+ * Implementation notes:
+ * - it is possible to enable/disable compression at any time.
+ * - nodes can be using different compression algorithm at any time.
+ * - knet does NOT implement compression algorithm directly. it relies
+ *   on external libraries for this functionality. Please read
+ *   the libraries man pages to figure out which algorithm/compression
+ *   level is best for the data you are planning to transmit.
+ *
+ * knet_handle_compress returns:
+ *
+ * 0 on success
+ * -1 on error and errno is set. EINVAL means that either the model or the
+ *    level are not supported.
+ */
+
+struct knet_handle_compress_cfg {
+	char	compress_model[16];
+	int	compress_level;
+};
+
+int knet_handle_compress(knet_handle_t knet_h,
+			 struct knet_handle_compress_cfg *knet_handle_compress_cfg);
+
 /*
  * host structs/API calls
  */
 
 /*
  * knet_host_add
  *
  * knet_h   - pointer to knet_handle_t
  *
  * host_id  - each host in a knet is identified with a unique ID
  *            (see also knet_handle_new documentation above)
  *
  * knet_host_add returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_host_add(knet_handle_t knet_h, knet_node_id_t host_id);
 
 /*
  * knet_host_remove
  *
  * knet_h   - pointer to knet_handle_t
  *
  * host_id  - each host in a knet is identified with a unique ID
  *            (see also knet_handle_new documentation above)
  *
  * knet_host_remove returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_host_remove(knet_handle_t knet_h, knet_node_id_t host_id);
 
 /*
  * knet_host_set_name
  *
  * knet_h   - pointer to knet_handle_t
  *
  * host_id  - see above
  *
  * name     - this name will be used for pretty logging and eventually
  *            search for hosts (see also get_name and get_id below).
  *            Only up to KNET_MAX_HOST_LEN - 1 bytes will be accepted and
  *            name has to be unique for each host.
  *
  * knet_host_set_name returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_host_set_name(knet_handle_t knet_h, knet_node_id_t host_id,
 		       const char *name);
 
 /*
  * knet_host_get_name_by_host_id
  *
  * knet_h   - pointer to knet_handle_t
  *
  * host_id  - see above
  *
  * name     - pointer to a preallocated buffer of at least size KNET_MAX_HOST_LEN
  *            where the current host name will be stored
  *            (as set by knet_host_set_name or default by knet_host_add)
  *
  * knet_host_get_name_by_host_id returns:
  *
  * 0 on success
  * -1 on error and errno is set (name is left untouched)
  */
 
 int knet_host_get_name_by_host_id(knet_handle_t knet_h, knet_node_id_t host_id,
 				  char *name);
 
 /*
  * knet_host_get_id_by_host_name
  *
  * knet_h   - pointer to knet_handle_t
  *
  * name     - name to lookup, max len KNET_MAX_HOST_LEN
  *
  * host_id  - where to store the result
  *
  * knet_host_get_id_by_host_name returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_host_get_id_by_host_name(knet_handle_t knet_h, const char *name,
 				  knet_node_id_t *host_id);
 
 /*
  * knet_host_get_host_list
  *
  * knet_h   - pointer to knet_handle_t
  *
  * host_ids - array of at lest KNET_MAX_HOST size
  *
  * host_ids_entries -
  *            number of entries writted in host_ids
  *
  * knet_host_get_host_list returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_host_get_host_list(knet_handle_t knet_h,
 			    knet_node_id_t *host_ids, size_t *host_ids_entries);
 
 /*
  * define switching policies
  */
 
 #define KNET_LINK_POLICY_PASSIVE 0
 #define KNET_LINK_POLICY_ACTIVE  1
 #define KNET_LINK_POLICY_RR      2
 
 /*
  * knet_host_set_policy
  *
  * knet_h   - pointer to knet_handle_t
  *
  * host_id  - see above
  *
  * policy   - there are currently 3 kind of simple switching policies
  *            as defined above, based on link configuration.
  *            KNET_LINK_POLICY_PASSIVE - the active link with the lowest
  *                                       priority will be used.
  *                                       if one or more active links share
  *                                       the same priority, the one with
  *                                       lowest link_id will be used.
  *
  *            KNET_LINK_POLICY_ACTIVE  - all active links will be used
  *                                       simultaneously to send traffic.
  *                                       link priority is ignored.
  *
  *            KNET_LINK_POLICY_RR      - round-robin policy, every packet
  *                                       will be send on a different active
  *                                       link.
  *
  * knet_host_set_policy returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_host_set_policy(knet_handle_t knet_h, knet_node_id_t host_id,
 			 uint8_t policy);
 
 /*
  * knet_host_get_policy
  *
  * knet_h   - pointer to knet_handle_t
  *
  * host_id  - see above
  *
  * policy   - will contain the current configured switching policy.
  *            Default is passive when creating a new host.
  *
  * knet_host_get_policy returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_host_get_policy(knet_handle_t knet_h, knet_node_id_t host_id,
 			 uint8_t *policy);
 
 /*
  * knet_host_enable_status_change_notify
  *
  * knet_h   - pointer to knet_handle_t
  *
  * host_status_change_notify_fn_private_data
  *            void pointer to data that can be used to identify
  *            the callback.
  *
  * host_status_change_notify_fn
  *            is a callback function that is invoked every time
  *            there is a change in the host status.
  *            host status is identified by:
  *            - reachable, this host can send/receive data to/from host_id
  *            - remote, 0 if the host_id is connected locally or 1 if
  *                      the there is one or more knet host(s) in between.
  *                      NOTE: re-switching is NOT currently implemented,
  *                            but this is ready for future and can avoid
  *                            an API/ABI breakage later on.
  *            - external, 0 if the host_id is configured locally or 1 if
  *                        it has been added from remote nodes config.
  *                        NOTE: dynamic topology is NOT currently implemented,
  *                        but this is ready for future and can avoid
  *                        an API/ABI breakage later on.
  *            This function MUST NEVER block or add substantial delays.
  *
  * knet_host_status_change_notify returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_host_enable_status_change_notify(knet_handle_t knet_h,
 					  void *host_status_change_notify_fn_private_data,
 					  void (*host_status_change_notify_fn) (
 						void *private_data,
 						knet_node_id_t host_id,
 						uint8_t reachable,
 						uint8_t remote,
 						uint8_t external));
 
 /*
  * define host status structure for quick lookup
  * struct is in flux as more stats will be added soon
  *
  * reachable             host_id can be seen either directly connected
  *                       or via another host_id
  *
  * remote                0 = node is connected locally, 1 is visible via
  *                       via another host_id
  *
  * external              0 = node is configured/known locally,
  *                       1 host_id has been received via another host_id
  */
 
 struct knet_host_status {
 	uint8_t reachable;
 	uint8_t remote;
 	uint8_t external;
 	/* add host statistics */
 };
 
 /*
  * knet_host_status_get
  *
  * knet_h   - pointer to knet_handle_t
  *
  * status    - pointer to knet_host_status struct (see above)
  *
  * knet_handle_pmtud_get returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_host_get_status(knet_handle_t knet_h, knet_node_id_t host_id,
 			 struct knet_host_status *status);
 
 /*
  * link structs/API calls
  *
  * every host allocated/managed by knet_host_* has
  * KNET_MAX_LINK structures to define the network
  * paths that connect 2 hosts.
  *
  * Each link is identified by a link_id that has a
  * values between 0 and KNET_MAX_LINK - 1.
  *
  * KNOWN LIMITATIONS:
  *
  * - let's assume the scenario where two hosts are connected
  *   with any number of links. link_id must match on both sides.
  *   If host_id 0 link_id 0 is configured to connect IP1 to IP2 and
  *   host_id 0 link_id 1 is configured to connect IP3 to IP4,
  *   host_id 1 link_id 0 _must_ connect IP2 to IP1 and likewise
  *   host_id 1 link_id 1 _must_ connect IP4 to IP3.
  *   We might be able to lift this restriction in future, by using
  *   other data to determine src/dst link_id, but for now, deal with it.
- *
- * -
  */
 
 /*
  * commodity functions to convert strings to sockaddr and viceversa
  */
 
 /*
  * knet_strtoaddr
  *
  * host      - IPaddr/hostname to convert
  *             be aware only the first IP address will be returned
  *             in case a hostname resolves to multiple IP
  *
  * port      - port to connect to
  *
  * ss        - sockaddr_storage where to store the converted data
  *
  * sslen     - len of the sockaddr_storage
  *
  * knet_strtoaddr returns same error codes as getaddrinfo
  *
  */
 
 int knet_strtoaddr(const char *host, const char *port,
 		   struct sockaddr_storage *ss, socklen_t sslen);
 
 /*
  * knet_addrtostr
  *
  * ss        - sockaddr_storage to convert
  *
  * sslen     - len of the sockaddr_storage
  *
  * host      - IPaddr/hostname where to store data
  *             (recommended size: KNET_MAX_HOST_LEN)
  *
  * port      - port buffer where to store data
  *             (recommended size: KNET_MAX_PORT_LEN)
  *
  * knet_strtoaddr returns same error codes as getnameinfo
- *
  */
 
 int knet_addrtostr(const struct sockaddr_storage *ss, socklen_t sslen,
 		   char *addr_buf, size_t addr_buf_size,
 		   char *port_buf, size_t port_buf_size);
 
 /*
  * knet_handle_get_transport_list
  *
  * knet_h                 - pointer to knet_handle_t
  *
  * transport_list         - an array of struct transport_info that must be
  *                          at least of size struct transport_info * KNET_MAX_TRANSPORTS
  *
  * transport_list_entries - pointer to a size_t where to store how many transports
  *                          are available in this build of libknet.
  *
  * knet_handle_get_transport_list returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 #define KNET_TRANSPORT_LOOPBACK 0
 #define KNET_TRANSPORT_UDP      1
 #define KNET_TRANSPORT_SCTP     2
 #define KNET_MAX_TRANSPORTS     3
 
 /*
  * The Loopback transport is only valid for connections to localhost, the host
  * with the same node_id specified in knet_handle_new(). Only one link of this
  * type is allowed. Data sent down a LOOPBACK link will be copied directly from
  * the knet send datafd to the knet receive datafd so the application must be set
  * up to take data from that socket at least as often as it is sent or deadlocks
  * could occur. If used, a LOOPBACK link must be the only link configured to the
  * local host.
  */
 
 struct transport_info {
 	const char *name;   /* UDP/SCTP/etc... */
 	uint8_t id;         /* value that can be used for link_set_config */
 	uint8_t properties; /* currently unused */
 };
 
 int knet_handle_get_transport_list(knet_handle_t knet_h,
 				   struct transport_info *transport_list, size_t *transport_list_entries);
 
 /*
  * knet_handle_get_transport_name_by_id
  *
  * knet_h    - pointer to knet_handle_t
  *
  * transport - one of the above KNET_TRANSPORT_xxx constants
  *
  * knet_handle_get_transport_name_by_id returns:
  *
  * pointer to the name on success or
  * NULL on error and errno is set.
  */
 
 const char *knet_handle_get_transport_name_by_id(knet_handle_t knet_h, uint8_t transport);
 
 /*
  * knet_handle_get_transport_id_by_name
  *
  * knet_h    - pointer to knet_handle_t
  *
  * name      - transport name (UDP/SCTP/etc)
  *
  * knet_handle_get_transport_name_by_id returns:
  *
  * KNET_MAX_TRANSPORTS on error and errno is set accordingly
  * KNET_TRANSPORT_xxx on success.
  */
 
 uint8_t knet_handle_get_transport_id_by_name(knet_handle_t knet_h, const char *name);
 
 /*
  * knet_handle_set_transport_reconnect_interval
  *
  * knet_h    - pointer to knet_handle_t
  *
  * msecs     - milliseconds
  *
  * knet_handle_set_transport_reconnect_interval returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 #define KNET_TRANSPORT_DEFAULT_RECONNECT_INTERVAL 1000
 
 int knet_handle_set_transport_reconnect_interval(knet_handle_t knet_h, uint32_t msecs);
 
 /*
  * knet_handle_get_transport_reconnect_interval
  *
  * knet_h    - pointer to knet_handle_t
  *
  * msecs     - milliseconds
  *
  * knet_handle_get_transport_reconnect_interval returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_handle_get_transport_reconnect_interval(knet_handle_t knet_h, uint32_t *msecs);
 
 /*
  * knet_link_set_config
  *
  * knet_h    - pointer to knet_handle_t
  *
  * host_id   - see above
  *
  * link_id   - see above
  *
  * transport - one of the above KNET_TRANSPORT_xxx constants
  *
  * src_addr  - sockaddr_storage that can be either IPv4 or IPv6
  *
  * dst_addr  - sockaddr_storage that can be either IPv4 or IPv6
  *             this can be null if we don't know the incoming
  *             IP address/port and the link will remain quiet
  *             till the node on the other end will initiate a
  *             connection
  *
  * flags     - KNET_LINK_FLAG_*
  *
  * knet_link_set_config returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
 			 uint8_t transport,
 			 struct sockaddr_storage *src_addr,
 			 struct sockaddr_storage *dst_addr,
 			 uint64_t flags);
 
 /*
  * knet_link_get_config
  *
  * knet_h    - pointer to knet_handle_t
  *
  * host_id   - see above
  *
  * link_id   - see above
  *
  * transport - see above
  *
  * src_addr  - sockaddr_storage that can be either IPv4 or IPv6
  *
  * dst_addr  - sockaddr_storage that can be either IPv4 or IPv6
  *
  * dynamic   - 0 if dst_addr is static or 1 if dst_addr is dynamic.
  *             In case of 1, dst_addr can be NULL and it will be left
  *             untouched.
  *
  * flags     - KNET_LINK_FLAG_*
  *
  * knet_link_get_config returns:
  *
  * 0 on success.
  * -1 on error and errno is set.
  */
 
 int knet_link_get_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
 			 uint8_t *transport,
 			 struct sockaddr_storage *src_addr,
 			 struct sockaddr_storage *dst_addr,
 			 uint8_t *dynamic,
 			 uint64_t *flags);
 
 /*
  * knet_link_clear_config
  *
  * knet_h    - pointer to knet_handle_t
  *
  * host_id   - see above
  *
  * link_id   - see above
  *
  * knet_link_clear_config returns:
  *
  * 0 on success.
  * -1 on error and errno is set.
  */
 
 int knet_link_clear_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id);
 
 /*
  * knet_link_set_enable
  *
  * knet_h    - pointer to knet_handle_t
  *
  * host_id   - see above
  *
  * link_id   - see above
  *
  * enabled   - 0 disable the link, 1 enable the link
  *
  * knet_link_set_enable returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_link_set_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
 			 unsigned int enabled);
 
 /*
  * knet_link_get_enable
  *
  * knet_h    - pointer to knet_handle_t
  *
  * host_id   - see above
  *
  * link_id   - see above
  *
  * enabled   - 0 disable the link, 1 enable the link
  *
  * knet_link_get_enable returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_link_get_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
 			 unsigned int *enabled);
 
 /*
  * knet_link_set_ping_timers
  *
  * knet_h    - pointer to knet_handle_t
  *
  * host_id   - see above
  *
  * link_id   - see above
  *
  * interval  - specify the ping interval
  *
  * timeout   - if no pong is received within this time,
  *             the link is declared dead
  *
  * precision - how many values of latency are used to calculate
  *             the average link latency (see also get_status below)
  *
  * knet_link_set_ping_timers returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 #define KNET_LINK_DEFAULT_PING_INTERVAL  1000 /* 1 second */
 #define KNET_LINK_DEFAULT_PING_TIMEOUT   2000 /* 2 seconds */
 #define KNET_LINK_DEFAULT_PING_PRECISION 2048 /* samples */
 
 int knet_link_set_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
 			      time_t interval, time_t timeout, unsigned int precision);
 
 /*
  * knet_link_get_ping_timers
  *
  * knet_h    - pointer to knet_handle_t
  *
  * host_id   - see above
  *
  * link_id   - see above
  *
  * interval  - ping intervall
  *
  * timeout   - if no pong is received within this time,
  *             the link is declared dead
  *
  * precision - how many values of latency are used to calculate
  *             the average link latency (see also get_status below)
  *
  * knet_link_get_ping_timers returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_link_get_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
 			      time_t *interval, time_t *timeout, unsigned int *precision);
 
 /*
  * knet_link_set_pong_count
  *
  * knet_h     - pointer to knet_handle_t
  *
  * host_id    - see above
  *
  * link_id    - see above
  *
  * pong_count - how many valid ping/pongs before a link is marked UP.
  *              default: 5, value should be > 0
  *
  * knet_link_set_pong_count returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 #define KNET_LINK_DEFAULT_PONG_COUNT 5
 
 int knet_link_set_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
 			     uint8_t pong_count);
 
 /*
  * knet_link_get_pong_count
  *
  * knet_h     - pointer to knet_handle_t
  *
  * host_id    - see above
  *
  * link_id    - see above
  *
  * pong_count - see above
  *
  * knet_link_get_pong_count returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_link_get_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
 			     uint8_t *pong_count);
 
 /*
  * knet_link_set_priority
  *
  * knet_h    - pointer to knet_handle_t
  *
  * host_id   - see above
  *
  * link_id   - see above
  *
  * priority  - specify the switching priority for this link
  *             see also knet_host_set_policy
  *
  * knet_link_set_priority returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_link_set_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
 			   uint8_t priority);
 
 /*
  * knet_link_get_priority
  *
  * knet_h    - pointer to knet_handle_t
  *
  * host_id   - see above
  *
  * link_id   - see above
  *
  * priority  - gather the switching priority for this link
  *             see also knet_host_set_policy
  *
  * knet_link_get_priority returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_link_get_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
 			   uint8_t *priority);
 
 /*
  * knet_link_get_link_list
  *
  * knet_h   - pointer to knet_handle_t
  *
  * link_ids - array of at lest KNET_MAX_LINK size
  *            with the list of configured links for a certain host.
  *
  * link_ids_entries -
  *            number of entries contained in link_ids
  *
  * knet_link_get_link_list returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_link_get_link_list(knet_handle_t knet_h, knet_node_id_t host_id,
 			    uint8_t *link_ids, size_t *link_ids_entries);
 
 /*
  * define link status structure for quick lookup
  *
  * src/dst_{ipaddr,port} strings are filled by
  *                       getnameinfo(3) when configuring the link.
  *                       if the link is dynamic (see knet_link_set_config)
  *                       dst_ipaddr/port will contain ipaddr/port of the currently
  *                       connected peer or "Unknown" if it was not possible
  *                       to determine the ipaddr/port at runtime.
  *
  * enabled               see also knet_link_set/get_enable.
  *
  * connected             the link is connected to a peer and ping/pong traffic
  *                       is flowing.
  *
  * dynconnected          the link has dynamic ip on the other end, and
  *                       we can see the other host is sending pings to us.
  *
  * latency               average latency of this link
  *                       see also knet_link_set/get_timeout.
  *
  * pong_last             if the link is down, this value tells us how long
  *                       ago this link was active. A value of 0 means that the link
  *                       has never been active.
  *
  * knet_link_stats       structure that contains details statistics for the link
  */
 
 #define MAX_LINK_EVENTS 16
 struct knet_link_stats {
 	/* onwire values */
 	uint64_t tx_data_packets;
 	uint64_t rx_data_packets;
 	uint64_t tx_data_bytes;
 	uint64_t rx_data_bytes;
 	uint64_t rx_ping_packets;
 	uint64_t tx_ping_packets;
 	uint64_t rx_ping_bytes;
 	uint64_t tx_ping_bytes;
 	uint64_t rx_pong_packets;
 	uint64_t tx_pong_packets;
 	uint64_t rx_pong_bytes;
 	uint64_t tx_pong_bytes;
 	uint64_t rx_pmtu_packets;
 	uint64_t tx_pmtu_packets;
 	uint64_t rx_pmtu_bytes;
 	uint64_t tx_pmtu_bytes;
 
 	/* Only filled in when requested */
 	uint64_t tx_total_packets;
 	uint64_t rx_total_packets;
 	uint64_t tx_total_bytes;
 	uint64_t rx_total_bytes;
 	uint64_t tx_total_errors;
 	uint64_t tx_total_retries;
 
 	uint32_t tx_pmtu_errors;
 	uint32_t tx_pmtu_retries;
 	uint32_t tx_ping_errors;
 	uint32_t tx_ping_retries;
 	uint32_t tx_pong_errors;
 	uint32_t tx_pong_retries;
 	uint32_t tx_data_errors;
 	uint32_t tx_data_retries;
 
 	/* measured in usecs */
 	uint32_t latency_min;
 	uint32_t latency_max;
 	uint32_t latency_ave;
 	uint32_t latency_samples;
 
 	/* how many times the link has been going up/down */
 	uint32_t down_count;
 	uint32_t up_count;
 
 	/*
 	 * circular buffer of time_t structs collecting the history
 	 * of up/down events on this link.
 	 * the index indicates current/last event.
 	 * it is safe to walk back the history by decreasing the index
 	 */
 	time_t   last_up_times[MAX_LINK_EVENTS];
 	time_t   last_down_times[MAX_LINK_EVENTS];
 	int8_t   last_up_time_index;
 	int8_t   last_down_time_index;
 	/* Always add new stats at the end */
 };
 
 struct knet_link_status {
 	size_t size;                    /* For ABI checking */
 	char src_ipaddr[KNET_MAX_HOST_LEN];
 	char src_port[KNET_MAX_PORT_LEN];
 	char dst_ipaddr[KNET_MAX_HOST_LEN];
 	char dst_port[KNET_MAX_PORT_LEN];
 	uint8_t enabled;	        /* link is configured and admin enabled for traffic */
 	uint8_t connected;              /* link is connected for data (local view) */
 	uint8_t dynconnected;	        /* link has been activated by remote dynip */
 	unsigned long long latency;	/* average latency computed by fix/exp */
 	struct timespec pong_last;
 	unsigned int mtu;		/* current detected MTU on this link */
 	unsigned int proto_overhead;    /* contains the size of the IP protocol, knet headers and
 					 * crypto headers (if configured). This value is filled in
 					 * ONLY after the first PMTUd run on that given link,
 					 * and can change if link configuration or crypto configuration
 					 * changes at runtime.
 					 * WARNING: in general mtu + proto_overhead might or might
 					 * not match the output of ifconfig mtu due to crypto
 					 * requirements to pad packets to some specific boundaries. */
 	/* Link statistics */
 	struct knet_link_stats stats;
 };
 
 /*
  * knet_link_get_status
  *
  * knet_h    - pointer to knet_handle_t
  *
  * host_id   - see above
  *
  * link_id   - see above
  *
  * status    - pointer to knet_link_status struct (see above)
  *
  * struct_size - max size of knet_link_status - allows library to
  *               add fields without ABI change. Returned structure
  *               will be truncated to this length and .size member
  *               indicates the full size.
  *
  * knet_link_get_status returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_link_get_status(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
 			 struct knet_link_status *status, size_t struct_size);
 
 /*
  * logging structs/API calls
  */
 
 /*
  * libknet is composed of several subsystems. In order
  * to easily distinguish log messages coming from different
  * places, each subsystem has its own ID.
  *
  *  0-19 config/management
  * 20-39 internal threads
  * 40-59 transports
  * 60-69 crypto implementations
  */
 
 #define KNET_SUB_COMMON       0 /* common.c */
 #define KNET_SUB_HANDLE       1 /* handle.c alloc/dealloc config changes */
 #define KNET_SUB_HOST         2 /* host add/del/modify */
 #define KNET_SUB_LISTENER     3 /* listeners add/del/modify... */
 #define KNET_SUB_LINK         4 /* link add/del/modify */
 #define KNET_SUB_TRANSPORT    5 /* Transport common */
 #define KNET_SUB_CRYPTO       6 /* crypto.c config generic layer */
+#define KNET_SUB_COMPRESS     7 /* compress.c config generic layer */
 
 #define KNET_SUB_FILTER      19 /* allocated for users to log from dst_filter */
 
 #define KNET_SUB_DSTCACHE    20 /* switching thread (destination cache handling) */
 #define KNET_SUB_HEARTBEAT   21 /* heartbeat thread */
 #define KNET_SUB_PMTUD       22 /* Path MTU Discovery thread */
 #define KNET_SUB_TX          23 /* send to link thread */
 #define KNET_SUB_RX          24 /* recv from link thread */
 
 #define KNET_SUB_TRANSP_BASE 40 /* Base log level for transports */
 #define KNET_SUB_TRANSP_LOOPBACK (KNET_SUB_TRANSP_BASE + KNET_TRANSPORT_LOOPBACK)
 #define KNET_SUB_TRANSP_UDP      (KNET_SUB_TRANSP_BASE + KNET_TRANSPORT_UDP)
 #define KNET_SUB_TRANSP_SCTP     (KNET_SUB_TRANSP_BASE + KNET_TRANSPORT_SCTP)
 
 #define KNET_SUB_NSSCRYPTO   60 /* nsscrypto.c */
 
+#define KNET_SUB_ZLIBCOMP    70 /* zlibcompress.c */
+
 #define KNET_SUB_UNKNOWN     254
 #define KNET_MAX_SUBSYSTEMS  KNET_SUB_UNKNOWN + 1
 
 /*
  * Convert between subsystem IDs and names
  */
 
 /*
  * knet_log_get_subsystem_name
  *
  * return internal name of the subsystem or "common"
  */
 
 const char *knet_log_get_subsystem_name(uint8_t subsystem);
 
 /*
  * knet_log_get_subsystem_id
  *
  * return internal ID of the subsystem or KNET_SUB_COMMON
  */
 
 uint8_t knet_log_get_subsystem_id(const char *name);
 
 /*
  * 4 log levels are enough for everybody
  */
 
 #define KNET_LOG_ERR         0 /* unrecoverable errors/conditions */
 #define KNET_LOG_WARN        1 /* recoverable errors/conditions */
 #define KNET_LOG_INFO        2 /* info, link up/down, config changes.. */
 #define KNET_LOG_DEBUG       3
 
 /*
  * Convert between log level values and names
  */
 
 /*
  * knet_log_get_loglevel_name
  *
  * return internal name of the log level or "ERROR" for unknown values
  */
 
 const char *knet_log_get_loglevel_name(uint8_t level);
 
 /*
  * knet_log_get_loglevel_id
  *
  * return internal log level ID or KNET_LOG_ERR for invalid names
  */
 
 uint8_t knet_log_get_loglevel_id(const char *name);
 
 /*
  * every log message is composed by a text message (including a trailing \n)
  * and message level/subsystem IDs.
  * In order to make debugging easier it is possible to send those packets
  * straight to stdout/stderr (see knet_bench.c stdout option).
  */
 
 #define KNET_MAX_LOG_MSG_SIZE    256
 
 struct knet_log_msg {
 	char	msg[KNET_MAX_LOG_MSG_SIZE - (sizeof(uint8_t)*2)];
 	uint8_t	subsystem;	/* KNET_SUB_* */
 	uint8_t msglevel;	/* KNET_LOG_* */
 };
 
 /*
  * knet_log_set_log_level
  *
  * knet_h     - same as above
  *
  * subsystem  - same as above
  *
  * level      - same as above
  *
  * knet_log_set_loglevel allows fine control of log levels by subsystem.
  *                       See also knet_handle_new for defaults.
  *
  * knet_log_set_loglevel returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_log_set_loglevel(knet_handle_t knet_h, uint8_t subsystem,
 			  uint8_t level);
 
 /*
  * knet_log_get_log_level
  *
  * knet_h     - same as above
  *
  * subsystem  - same as above
  *
  * level      - same as above
  *
  * knet_log_get_loglevel returns:
  *
  * 0 on success
  * -1 on error and errno is set.
  */
 
 int knet_log_get_loglevel(knet_handle_t knet_h, uint8_t subsystem,
 			  uint8_t *level);
 
 #endif
diff --git a/libknet/logging.c b/libknet/logging.c
index 1fb6d346..7d08926f 100644
--- a/libknet/logging.c
+++ b/libknet/logging.c
@@ -1,248 +1,250 @@
 /*
  * Copyright (C) 2010-2015 Red Hat, Inc.  All rights reserved.
  *
  * Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
  *
  * This software licensed under GPL-2.0+, LGPL-2.0+
  */
 
 #include "config.h"
 
 #include <strings.h>
 #include <string.h>
 #include <unistd.h>
 #include <pthread.h>
 #include <stdarg.h>
 #include <errno.h>
 #include <stdio.h>
 
 #include "internals.h"
 #include "logging.h"
 
 struct pretty_names {
 	const char *name;
 	uint8_t val;
 };
 
 static struct pretty_names subsystem_names[] =
 {
 	{ "common", KNET_SUB_COMMON },
 	{ "handle", KNET_SUB_HANDLE },
 	{ "host", KNET_SUB_HOST },
 	{ "listener", KNET_SUB_LISTENER },
 	{ "link", KNET_SUB_LINK },
 	{ "transport", KNET_SUB_TRANSPORT },
 	{ "crypto", KNET_SUB_CRYPTO },
+	{ "compress", KNET_SUB_COMPRESS },
 	{ "filter", KNET_SUB_FILTER },
 	{ "dstcache", KNET_SUB_DSTCACHE },
 	{ "heartbeat", KNET_SUB_HEARTBEAT },
 	{ "pmtud", KNET_SUB_PMTUD },
 	{ "tx", KNET_SUB_TX },
 	{ "rx", KNET_SUB_RX },
 	{ "loopback", KNET_SUB_TRANSP_LOOPBACK },
 	{ "udp", KNET_SUB_TRANSP_UDP },
 	{ "sctp", KNET_SUB_TRANSP_SCTP },
 	{ "nsscrypto", KNET_SUB_NSSCRYPTO },
+	{ "zlibcomp", KNET_SUB_ZLIBCOMP },
 	{ "unknown", KNET_SUB_UNKNOWN }		/* unknown MUST always be last in this array */
 };
 
 const char *knet_log_get_subsystem_name(uint8_t subsystem)
 {
 	unsigned int i;
 
 	for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) {
 		if (subsystem_names[i].val == KNET_SUB_UNKNOWN) {
 			break;
 		}
 		if (subsystem_names[i].val == subsystem) {
 			return subsystem_names[i].name;
 		}
 	}
 	return "unknown";
 }
 
 uint8_t knet_log_get_subsystem_id(const char *name)
 {
 	unsigned int i;
 
 	for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) {
 		if (subsystem_names[i].val == KNET_SUB_UNKNOWN) {
 			break;
 		}
 		if (strcasecmp(name, subsystem_names[i].name) == 0) {
 			return subsystem_names[i].val;
 		}
 	}
 	return KNET_SUB_UNKNOWN;
 }
 
 static int is_valid_subsystem(uint8_t subsystem)
 {
 	unsigned int i;
 
 	for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) {
 		if ((subsystem != KNET_SUB_UNKNOWN) &&
 		    (subsystem_names[i].val == KNET_SUB_UNKNOWN)) {
 			break;
 		}
 		if (subsystem_names[i].val == subsystem) {
 			return 0;
 		}
 	}
 	return -1;
 }
 
 static struct pretty_names loglevel_names[] =
 {
 	{ "ERROR", KNET_LOG_ERR },
 	{ "WARNING", KNET_LOG_WARN },
 	{ "info", KNET_LOG_INFO },
 	{ "debug", KNET_LOG_DEBUG }
 };
 
 const char *knet_log_get_loglevel_name(uint8_t level)
 {
 	unsigned int i;
 
 	for (i = 0; i <= KNET_LOG_DEBUG; i++) {
 		if (loglevel_names[i].val == level) {
 			return loglevel_names[i].name;
 		}
 	}
 	return "ERROR";
 }
 
 uint8_t knet_log_get_loglevel_id(const char *name)
 {
 	unsigned int i;
 
 	for (i = 0; i <= KNET_LOG_DEBUG; i++) {
 		if (strcasecmp(name, loglevel_names[i].name) == 0) {
 			return loglevel_names[i].val;
 		}
 	}
 	return KNET_LOG_ERR;
 }
 
 int knet_log_set_loglevel(knet_handle_t knet_h, uint8_t subsystem,
 			  uint8_t level)
 {
 	int savederrno = 0;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (is_valid_subsystem(subsystem) < 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (level > KNET_LOG_DEBUG) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, subsystem, "Unable to get write lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	knet_h->log_levels[subsystem] = level;
 
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 	return 0;
 }
 
 int knet_log_get_loglevel(knet_handle_t knet_h, uint8_t subsystem,
 			  uint8_t *level)
 {
 	int savederrno = 0;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (is_valid_subsystem(subsystem) < 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (!level) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, subsystem, "Unable to get write lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	*level = knet_h->log_levels[subsystem];
 
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 	return 0;
 }
 
 void log_msg(knet_handle_t knet_h, uint8_t subsystem, uint8_t msglevel,
 	     const char *fmt, ...)
 {
 	va_list ap;
 	struct knet_log_msg msg;
 	size_t byte_cnt = 0;
 	int len, err;
 
 	if ((!knet_h) ||
 	    (subsystem == KNET_MAX_SUBSYSTEMS) ||
 	    (msglevel > knet_h->log_levels[subsystem]))
 			return;
 
 	/*
 	 * most logging calls will take place with locking in place.
 	 * if we get an EINVAL and locking is initialized, then
 	 * we are getting a real error and we need to stop
 	 */
 	err = pthread_rwlock_tryrdlock(&knet_h->global_rwlock);
 	if ((err == EAGAIN) && (knet_h->lock_init_done))
 		return;
 
 	if (knet_h->logfd <= 0)
 		goto out_unlock;
 
 	memset(&msg, 0, sizeof(struct knet_log_msg));
 	msg.subsystem = subsystem;
 	msg.msglevel = msglevel;
 
 	va_start(ap, fmt);
 	vsnprintf(msg.msg, sizeof(msg.msg) - 2, fmt, ap);
 	va_end(ap);
 
 	len = strlen(msg.msg);
 	msg.msg[len+1] = '\n';
 
 	while (byte_cnt < sizeof(struct knet_log_msg)) {
 		len = write(knet_h->logfd, &msg, sizeof(struct knet_log_msg) - byte_cnt);
 		if (len <= 0) {
 			goto out_unlock;
 		}
 
 		byte_cnt += len;
 	}
 
 out_unlock:
 	/*
 	 * unlock only if we are holding the lock
 	 */
 	if (!err)
 		pthread_rwlock_unlock(&knet_h->global_rwlock);
 
 	return;
 }
diff --git a/libknet/onwire.h b/libknet/onwire.h
index 5d51701c..50de9af9 100644
--- a/libknet/onwire.h
+++ b/libknet/onwire.h
@@ -1,201 +1,202 @@
 /*
  * Copyright (C) 2010-2015 Red Hat, Inc.  All rights reserved.
  *
  * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
  *          Federico Simoncelli <fsimon@kronosnet.org>
  *
  * This software licensed under GPL-2.0+, LGPL-2.0+
  */
 
 #ifndef __KNET_ONWIRE_H__
 #define __KNET_ONWIRE_H__
 
 /*
  * data structures to define network packets.
  * Start from knet_header at the bottom
  */
 
 #include <stdint.h>
 
 #include "libknet.h"
 
 #if 0
 
 /*
  * for future protocol extension (re-switching table calculation)
  */
 
 struct knet_hinfo_link {
 	uint8_t			khl_link_id;
 	uint8_t			khl_link_dynamic;
 	uint8_t			khl_link_priority;
 	uint64_t		khl_link_latency;
 	char			khl_link_dst_ipaddr[KNET_MAX_HOST_LEN];
 	char			khl_link_dst_port[KNET_MAX_PORT_LEN];
 } __attribute__((packed));
 
 struct knet_hinfo_link_table {
 	knet_node_id_t		khlt_node_id;
 	uint8_t			khlt_local; /* we have this node connected locally */
 	struct knet_hinfo_link	khlt_link[KNET_MAX_LINK]; /* info we send about each link in the node */
 } __attribute__((packed));
 
 struct link_table {
 	knet_node_id_t		khdt_host_entries;
 	uint8_t			khdt_host_maps[0]; /* array of knet_hinfo_link_table[khdt_host_entries] */
 } __attribute__((packed));
 #endif
 
 #define KNET_HOSTINFO_LINK_STATUS_DOWN 0
 #define KNET_HOSTINFO_LINK_STATUS_UP   1
 
 struct knet_hostinfo_payload_link_status {
 	uint8_t		khip_link_status_link_id;	/* link id */
 	uint8_t		khip_link_status_status;	/* up/down status */
 } __attribute__((packed));
 
 /*
  * union to reference possible individual payloads
  */
 
 union knet_hostinfo_payload {
 	struct knet_hostinfo_payload_link_status knet_hostinfo_payload_link_status;
 } __attribute__((packed));
 
 /*
  * due to the nature of knet_hostinfo, we are currently
  * sending those data as part of knet_header_payload_data.khp_data_userdata
  * and avoid a union that increses knet_header_payload_data size
  * unnecessarely.
  * This might change later on depending on how we implement
  * host info exchange
  */
 
 #define KNET_HOSTINFO_TYPE_LINK_UP_DOWN 0 // UNUSED
 #define KNET_HOSTINFO_TYPE_LINK_TABLE   1 // NOT IMPLEMENTED
 
 #define KNET_HOSTINFO_UCAST 0	/* send info to a specific host */
 #define KNET_HOSTINFO_BCAST 1	/* send info to all known / connected hosts */
 
 struct knet_hostinfo {
 	uint8_t				khi_type;	/* type of hostinfo we are sending */
 	uint8_t				khi_bcast;	/* hostinfo destination bcast/ucast */
 	knet_node_id_t			khi_dst_node_id;/* used only if in ucast mode */
 	union knet_hostinfo_payload	khi_payload;
 } __attribute__((packed));
 
 #define KNET_HOSTINFO_ALL_SIZE sizeof(struct knet_hostinfo)
 #define KNET_HOSTINFO_SIZE (KNET_HOSTINFO_ALL_SIZE - sizeof(union knet_hostinfo_payload))
 #define KNET_HOSTINFO_LINK_STATUS_SIZE (KNET_HOSTINFO_SIZE + sizeof(struct knet_hostinfo_payload_link_status))
 
 #define khip_link_status_status khi_payload.knet_hostinfo_payload_link_status.khip_link_status_status
 #define khip_link_status_link_id khi_payload.knet_hostinfo_payload_link_status.khip_link_status_link_id
 
 /*
  * typedef uint64_t seq_num_t;
  * #define SEQ_MAX UINT64_MAX
  */
 typedef uint16_t seq_num_t;
 #define SEQ_MAX UINT16_MAX
 
 struct knet_header_payload_data {
 	seq_num_t	khp_data_seq_num;	/* pckt seq number used to deduplicate pkcts */
+	uint8_t		khp_data_compress;	/* identify if user data are compressed */
 	uint8_t		khp_data_pad1;		/* make sure to have space in the header to grow features */
-	uint8_t		khp_data_pad2;
 	uint8_t		khp_data_bcast;		/* data destination bcast/ucast */
 	uint8_t		khp_data_frag_num;	/* number of fragments of this pckt. 1 is not fragmented */
 	uint8_t		khp_data_frag_seq;	/* as above, indicates the frag sequence number */
 	int8_t		khp_data_channel;	/* transport channel data for localsock <-> knet <-> localsock mapping */
 	uint8_t		khp_data_userdata[0];	/* pointer to the real user data */
 } __attribute__((packed));
 
 struct knet_header_payload_ping {
 	uint8_t		khp_ping_link;		/* source link id */
 	uint32_t	khp_ping_time[4];	/* ping timestamp */
 	seq_num_t	khp_ping_seq_num;	/* transport host seq_num */
 	uint8_t		khp_ping_timed;		/* timed pinged (1) or forced by seq_num (0) */
 }  __attribute__((packed));
 
 /* taken from tracepath6 */
 #define KNET_PMTUD_SIZE_V4 65535
 #define KNET_PMTUD_SIZE_V6 KNET_PMTUD_SIZE_V4
 
 /* These two get the protocol-specific overheads added to them */
 #define KNET_PMTUD_OVERHEAD_V4 20
 #define KNET_PMTUD_OVERHEAD_V6 40
 
 #define KNET_PMTUD_MIN_MTU_V4 576
 #define KNET_PMTUD_MIN_MTU_V6 1280
 
 struct knet_header_payload_pmtud {
 	uint8_t		khp_pmtud_link;		/* source link id */
 	uint16_t	khp_pmtud_size;		/* size of the current packet */
 	uint8_t		khp_pmtud_data[0];	/* pointer to empty/random data/fill buffer */
 } __attribute__((packed));
 
 /*
  * union to reference possible individual payloads
  */
 
 union knet_header_payload {
 	struct knet_header_payload_data		khp_data;  /* pure data packet struct */
 	struct knet_header_payload_ping		khp_ping;  /* heartbeat packet struct */
 	struct knet_header_payload_pmtud 	khp_pmtud; /* Path MTU discovery packet struct */
 } __attribute__((packed));
 
 /*
  * starting point
  */
 
 #define KNET_HEADER_VERSION          0x01 /* we currently support only one version */
 
 #define KNET_HEADER_TYPE_DATA        0x00 /* pure data packet */
 #define KNET_HEADER_TYPE_HOST_INFO   0x01 /* host status information pckt */
 
 #define KNET_HEADER_TYPE_PMSK        0x80 /* packet mask */
 #define KNET_HEADER_TYPE_PING        0x81 /* heartbeat */
 #define KNET_HEADER_TYPE_PONG        0x82 /* reply to heartbeat */
 #define KNET_HEADER_TYPE_PMTUD       0x83 /* Used to determine Path MTU */
 #define KNET_HEADER_TYPE_PMTUD_REPLY 0x84 /* reply from remote host */
 
 struct knet_header {
 	uint8_t				kh_version; /* pckt format/version */
 	uint8_t				kh_type;    /* from above defines. Tells what kind of pckt it is */
 	knet_node_id_t			kh_node;    /* host id of the source host for this pckt */
 	uint8_t				kh_pad1;    /* make sure to have space in the header to grow features */
 	uint8_t				kh_pad2;
 	union knet_header_payload	kh_payload; /* union of potential data struct based on kh_type */
 } __attribute__((packed));
 
 /*
  * commodoty defines to hide structure nesting
  * (needs review and cleanup)
  */
 
 #define khp_data_seq_num  kh_payload.khp_data.khp_data_seq_num
 #define khp_data_frag_num kh_payload.khp_data.khp_data_frag_num
 #define khp_data_frag_seq kh_payload.khp_data.khp_data_frag_seq
 #define khp_data_userdata kh_payload.khp_data.khp_data_userdata
 #define khp_data_bcast    kh_payload.khp_data.khp_data_bcast
 #define khp_data_channel  kh_payload.khp_data.khp_data_channel
+#define khp_data_compress kh_payload.khp_data.khp_data_compress
 
 #define khp_ping_link     kh_payload.khp_ping.khp_ping_link
 #define khp_ping_time     kh_payload.khp_ping.khp_ping_time
 #define khp_ping_seq_num  kh_payload.khp_ping.khp_ping_seq_num
 #define khp_ping_timed    kh_payload.khp_ping.khp_ping_timed
 
 #define khp_pmtud_link    kh_payload.khp_pmtud.khp_pmtud_link
 #define khp_pmtud_size    kh_payload.khp_pmtud.khp_pmtud_size
 #define khp_pmtud_data    kh_payload.khp_pmtud.khp_pmtud_data
 
 /*
  * extra defines to avoid mingling with sizeof() too much
  */
 
 #define KNET_HEADER_ALL_SIZE sizeof(struct knet_header)
 #define KNET_HEADER_SIZE (KNET_HEADER_ALL_SIZE - sizeof(union knet_header_payload))
 #define KNET_HEADER_PING_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_ping))
 #define KNET_HEADER_PMTUD_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_pmtud))
 #define KNET_HEADER_DATA_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_data))
 
 #endif
diff --git a/libknet/tests/api-check.mk b/libknet/tests/api-check.mk
index 72ff8a2e..f1cc8ff5 100644
--- a/libknet/tests/api-check.mk
+++ b/libknet/tests/api-check.mk
@@ -1,223 +1,227 @@
 #
 # Copyright (C) 2016 Red Hat, Inc.  All rights reserved.
 #
 # Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
 #
 # This software licensed under GPL-2.0+, LGPL-2.0+
 #
 
 api_checks		= \
 			  api_knet_handle_new_test \
 			  api_knet_handle_free_test \
+			  api_knet_handle_compress_test \
 			  api_knet_handle_crypto_test \
 			  api_knet_handle_setfwd_test \
 			  api_knet_handle_enable_filter_test \
 			  api_knet_handle_enable_sock_notify_test \
 			  api_knet_handle_add_datafd_test \
 			  api_knet_handle_remove_datafd_test \
 			  api_knet_handle_get_channel_test \
 			  api_knet_handle_get_datafd_test \
 			  api_knet_handle_get_transport_list_test \
 			  api_knet_handle_get_transport_name_by_id_test \
 			  api_knet_handle_get_transport_id_by_name_test \
 			  api_knet_handle_set_transport_reconnect_interval_test \
 			  api_knet_handle_get_transport_reconnect_interval_test \
 			  api_knet_recv_test \
 			  api_knet_send_test \
 			  api_knet_send_sync_test \
 			  api_knet_send_loopback_test \
 			  api_knet_handle_pmtud_setfreq_test \
 			  api_knet_handle_pmtud_getfreq_test \
 			  api_knet_handle_enable_pmtud_notify_test \
 			  api_knet_handle_pmtud_get_test \
 			  api_knet_host_add_test \
 			  api_knet_host_remove_test \
 			  api_knet_host_set_name_test \
 			  api_knet_host_get_name_by_host_id_test \
 			  api_knet_host_get_id_by_host_name_test \
 			  api_knet_host_get_host_list_test \
 			  api_knet_host_set_policy_test \
 			  api_knet_host_get_policy_test \
 			  api_knet_host_get_status_test \
 			  api_knet_host_enable_status_change_notify_test \
 			  api_knet_log_get_subsystem_name_test \
 			  api_knet_log_get_subsystem_id_test \
 			  api_knet_log_get_loglevel_name_test \
 			  api_knet_log_get_loglevel_id_test \
 			  api_knet_log_set_loglevel_test \
 			  api_knet_log_get_loglevel_test \
 			  api_knet_strtoaddr_test \
 			  api_knet_addrtostr_test \
 			  api_knet_link_set_config_test \
 			  api_knet_link_clear_config_test \
 			  api_knet_link_get_config_test \
 			  api_knet_link_set_ping_timers_test \
 			  api_knet_link_get_ping_timers_test \
 			  api_knet_link_set_pong_count_test \
 			  api_knet_link_get_pong_count_test \
 			  api_knet_link_set_priority_test \
 			  api_knet_link_get_priority_test \
 			  api_knet_link_set_enable_test \
 			  api_knet_link_get_enable_test \
 			  api_knet_link_get_link_list_test \
 			  api_knet_link_get_status_test
 
 api_knet_handle_new_test_SOURCES = api_knet_handle_new.c \
 				   test-common.c
 
 api_knet_handle_free_test_SOURCES = api_knet_handle_free.c \
 				    test-common.c
 
+api_knet_handle_compress_test_SOURCES = api_knet_handle_compress.c \
+					test-common.c
+
 api_knet_handle_crypto_test_SOURCES = api_knet_handle_crypto.c \
 				      test-common.c
 
 api_knet_handle_setfwd_test_SOURCES = api_knet_handle_setfwd.c \
 				      test-common.c
 
 api_knet_handle_enable_filter_test_SOURCES = api_knet_handle_enable_filter.c \
 					     test-common.c
 
 api_knet_handle_enable_sock_notify_test_SOURCES = api_knet_handle_enable_sock_notify.c \
 						  test-common.c
 
 api_knet_handle_add_datafd_test_SOURCES = api_knet_handle_add_datafd.c \
 					  test-common.c
 
 api_knet_handle_remove_datafd_test_SOURCES = api_knet_handle_remove_datafd.c \
 					     test-common.c
 
 api_knet_handle_get_channel_test_SOURCES = api_knet_handle_get_channel.c \
 					   test-common.c
 
 api_knet_handle_get_datafd_test_SOURCES = api_knet_handle_get_datafd.c \
 					  test-common.c
 
 api_knet_handle_get_transport_list_test_SOURCES = api_knet_handle_get_transport_list.c \
 						  test-common.c
 
 api_knet_handle_get_transport_name_by_id_test_SOURCES = api_knet_handle_get_transport_name_by_id.c \
 							test-common.c
 
 api_knet_handle_get_transport_id_by_name_test_SOURCES = api_knet_handle_get_transport_id_by_name.c \
 							test-common.c
 
 api_knet_handle_set_transport_reconnect_interval_test_SOURCES = api_knet_handle_set_transport_reconnect_interval.c \
 								test-common.c
 
 api_knet_handle_get_transport_reconnect_interval_test_SOURCES = api_knet_handle_get_transport_reconnect_interval.c \
 								test-common.c
 
 api_knet_recv_test_SOURCES = api_knet_recv.c \
 			     test-common.c
 
 api_knet_send_test_SOURCES = api_knet_send.c \
 			     test-common.c
 
 api_knet_send_loopback_test_SOURCES = api_knet_send_loopback.c \
 			     test-common.c
 
 api_knet_send_sync_test_SOURCES = api_knet_send_sync.c \
 				  test-common.c
 
 api_knet_handle_pmtud_setfreq_test_SOURCES = api_knet_handle_pmtud_setfreq.c \
 					     test-common.c
 
 api_knet_handle_pmtud_getfreq_test_SOURCES = api_knet_handle_pmtud_getfreq.c \
 					     test-common.c
 
 api_knet_handle_enable_pmtud_notify_test_SOURCES = api_knet_handle_enable_pmtud_notify.c \
 						   test-common.c
 
 api_knet_handle_pmtud_get_test_SOURCES = api_knet_handle_pmtud_get.c \
 					 test-common.c
 
 api_knet_host_add_test_SOURCES = api_knet_host_add.c \
 				 test-common.c
 
 api_knet_host_remove_test_SOURCES = api_knet_host_remove.c \
 				    test-common.c
 
 api_knet_host_set_name_test_SOURCES = api_knet_host_set_name.c \
 				      test-common.c
 
 api_knet_host_get_name_by_host_id_test_SOURCES = api_knet_host_get_name_by_host_id.c \
 						 test-common.c
 
 api_knet_host_get_id_by_host_name_test_SOURCES = api_knet_host_get_id_by_host_name.c \
 						 test-common.c
 
 api_knet_host_get_host_list_test_SOURCES = api_knet_host_get_host_list.c \
 					   test-common.c
 
 api_knet_host_set_policy_test_SOURCES = api_knet_host_set_policy.c \
 					test-common.c
 
 api_knet_host_get_policy_test_SOURCES = api_knet_host_get_policy.c \
 					test-common.c
 
 api_knet_host_get_status_test_SOURCES = api_knet_host_get_status.c \
 					test-common.c
 
 api_knet_host_enable_status_change_notify_test_SOURCES = api_knet_host_enable_status_change_notify.c \
 							 test-common.c
 
 api_knet_log_get_subsystem_name_test_SOURCES = api_knet_log_get_subsystem_name.c \
 					       test-common.c
 
 api_knet_log_get_subsystem_id_test_SOURCES = api_knet_log_get_subsystem_id.c \
 					     test-common.c
 
 api_knet_log_get_loglevel_name_test_SOURCES = api_knet_log_get_loglevel_name.c \
 					      test-common.c
 
 api_knet_log_get_loglevel_id_test_SOURCES = api_knet_log_get_loglevel_id.c \
 					    test-common.c
 
 api_knet_log_set_loglevel_test_SOURCES = api_knet_log_set_loglevel.c \
 					 test-common.c
 
 api_knet_log_get_loglevel_test_SOURCES = api_knet_log_get_loglevel.c \
 					 test-common.c
 
 api_knet_strtoaddr_test_SOURCES = api_knet_strtoaddr.c
 
 api_knet_addrtostr_test_SOURCES = api_knet_addrtostr.c
 
 api_knet_link_set_config_test_SOURCES = api_knet_link_set_config.c \
 					test-common.c
 
 api_knet_link_clear_config_test_SOURCES = api_knet_link_clear_config.c \
 					  test-common.c
 
 api_knet_link_get_config_test_SOURCES = api_knet_link_get_config.c \
 					test-common.c
 
 api_knet_link_set_ping_timers_test_SOURCES = api_knet_link_set_ping_timers.c \
 					     test-common.c
 
 api_knet_link_get_ping_timers_test_SOURCES = api_knet_link_get_ping_timers.c \
 					     test-common.c
 
 api_knet_link_set_pong_count_test_SOURCES = api_knet_link_set_pong_count.c \
 					    test-common.c
 
 api_knet_link_get_pong_count_test_SOURCES = api_knet_link_get_pong_count.c \
 					    test-common.c
 
 api_knet_link_set_priority_test_SOURCES = api_knet_link_set_priority.c \
 					  test-common.c
 
 api_knet_link_get_priority_test_SOURCES = api_knet_link_get_priority.c \
 					  test-common.c
 
 api_knet_link_set_enable_test_SOURCES = api_knet_link_set_enable.c \
 					test-common.c
 
 api_knet_link_get_enable_test_SOURCES = api_knet_link_get_enable.c \
 					test-common.c
 
 api_knet_link_get_link_list_test_SOURCES = api_knet_link_get_link_list.c \
 					   test-common.c
 
 api_knet_link_get_status_test_SOURCES = api_knet_link_get_status.c \
 					test-common.c
diff --git a/libknet/tests/api_knet_handle_compress.c b/libknet/tests/api_knet_handle_compress.c
new file mode 100644
index 00000000..0a6e90f7
--- /dev/null
+++ b/libknet/tests/api_knet_handle_compress.c
@@ -0,0 +1,151 @@
+/*
+ * Copyright (C) 2016 Red Hat, Inc.  All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ *
+ * This software licensed under GPL-2.0+, LGPL-2.0+
+ */
+
+#include "config.h"
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "libknet.h"
+
+#include "internals.h"
+#include "test-common.h"
+
+static void test(void)
+{
+	knet_handle_t knet_h;
+	int logfds[2];
+	struct knet_handle_compress_cfg knet_handle_compress_cfg;
+
+	memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg));
+
+	printf("Test knet_handle_compress incorrect knet_h\n");
+
+	if ((!knet_handle_compress(NULL, &knet_handle_compress_cfg)) || (errno != EINVAL)) {
+		printf("knet_handle_compress accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno));
+		exit(FAIL);
+	}
+
+	setup_logpipes(logfds);
+
+	knet_h = knet_handle_new(1, logfds[1], KNET_LOG_DEBUG);
+
+	if (!knet_h) {
+		printf("knet_handle_new failed: %s\n", strerror(errno));
+		flush_logs(logfds[0], stdout);
+		close_logpipes(logfds);
+		exit(FAIL);
+	}
+
+	flush_logs(logfds[0], stdout);
+
+	printf("Test knet_handle_compress with invalid cfg\n");
+
+	if ((!knet_handle_compress(knet_h, NULL)) || (errno != EINVAL)) {
+		printf("knet_handle_compress accepted invalid cfg or returned incorrect error: %s\n", strerror(errno));
+		knet_handle_free(knet_h);
+		flush_logs(logfds[0], stdout);
+		close_logpipes(logfds);
+		exit(FAIL);
+	}
+
+	flush_logs(logfds[0], stdout);
+
+	printf("Test knet_handle_compress with un-initialized cfg\n");
+
+	memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg));
+
+	if ((!knet_handle_compress(knet_h, &knet_handle_compress_cfg)) || (errno != EINVAL)) {
+		printf("knet_handle_compress accepted invalid un-initialized cfg\n");
+		knet_handle_free(knet_h);
+		flush_logs(logfds[0], stdout);
+		close_logpipes(logfds);
+		exit(FAIL);
+	}
+
+	flush_logs(logfds[0], stdout);
+
+	printf("Test knet_handle_compress with none compress model (disable compress)\n");
+
+	memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg));
+	strncpy(knet_handle_compress_cfg.compress_model, "none", sizeof(knet_handle_compress_cfg.compress_model) - 1);
+
+	if (knet_handle_compress(knet_h, &knet_handle_compress_cfg) != 0) {
+		printf("knet_handle_compress did not accept none compress mode cfg\n");
+		knet_handle_free(knet_h);
+		flush_logs(logfds[0], stdout);
+		close_logpipes(logfds);
+		exit(FAIL);
+	}
+
+	flush_logs(logfds[0], stdout);
+
+	printf("Test knet_handle_compress with zlib compress and negative level\n");
+
+	memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg));
+	strncpy(knet_handle_compress_cfg.compress_model, "zlib", sizeof(knet_handle_compress_cfg.compress_model) - 1);
+	knet_handle_compress_cfg.compress_level = -1;
+
+	if ((!knet_handle_compress(knet_h, &knet_handle_compress_cfg)) || (errno != EINVAL)) {
+		printf("knet_handle_compress accepted invalid (-1) compress level for zlib\n");
+		knet_handle_free(knet_h);
+		flush_logs(logfds[0], stdout);
+		close_logpipes(logfds);
+		exit(FAIL);
+	}
+
+	flush_logs(logfds[0], stdout);
+
+	printf("Test knet_handle_compress with zlib compress and excessive compress level\n");
+
+	memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg));
+	strncpy(knet_handle_compress_cfg.compress_model, "zlib", sizeof(knet_handle_compress_cfg.compress_model) - 1);
+	knet_handle_compress_cfg.compress_level = 10;
+
+	if ((!knet_handle_compress(knet_h, &knet_handle_compress_cfg)) || (errno != EINVAL)) {
+		printf("knet_handle_compress accepted invalid (10) compress level for zlib\n");
+		knet_handle_free(knet_h);
+		flush_logs(logfds[0], stdout);
+		close_logpipes(logfds);
+		exit(FAIL);
+	}
+
+	flush_logs(logfds[0], stdout);
+
+	printf("Test knet_handle_compress with zlib compress model normal compress level)\n");
+
+	memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg));
+	strncpy(knet_handle_compress_cfg.compress_model, "zlib", sizeof(knet_handle_compress_cfg.compress_model) - 1);
+	knet_handle_compress_cfg.compress_level = 1;
+
+	if (knet_handle_compress(knet_h, &knet_handle_compress_cfg) != 0) {
+		printf("knet_handle_compress did not accept zlib compress mode with compress level 1 cfg\n");
+		knet_handle_free(knet_h);
+		flush_logs(logfds[0], stdout);
+		close_logpipes(logfds);
+		exit(FAIL);
+	}
+
+	flush_logs(logfds[0], stdout);
+
+	knet_handle_free(knet_h);
+	flush_logs(logfds[0], stdout);
+	close_logpipes(logfds);
+}
+
+int main(int argc, char *argv[])
+{
+	need_root();
+
+	test();
+
+	return PASS;
+}
diff --git a/libknet/tests/knet_bench.c b/libknet/tests/knet_bench.c
index e6ca21c1..d2406914 100644
--- a/libknet/tests/knet_bench.c
+++ b/libknet/tests/knet_bench.c
@@ -1,995 +1,1025 @@
 /*
  * Copyright (C) 2016 Red Hat, Inc.  All rights reserved.
  *
  * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
  *
  * This software licensed under GPL-2.0+, LGPL-2.0+
  */
 
 #include "config.h"
 
 #include <errno.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
 #include <signal.h>
 #include <pthread.h>
 #include <sys/types.h>
 #include <inttypes.h>
 
 #include "libknet.h"
 
 #include "compat.h"
 #include "internals.h"
 #include "netutils.h"
 #include "transports.h"
 #include "threads_common.h"
 #include "test-common.h"
 
 #define MAX_NODES 128
 
 static int senderid = -1;
 static int thisnodeid = -1;
 static knet_handle_t knet_h;
 static int datafd = 0;
 static int8_t channel = 0;
 static int globallistener = 0;
 static int continous = 0;
 static struct sockaddr_storage allv4;
 static struct sockaddr_storage allv6;
 static int broadcast_test = 1;
 static pthread_t rx_thread = (pthread_t)NULL;
 static char *rx_buf[PCKT_FRAG_MAX];
 static int wait_for_perf_rx = 0;
+static char *compresscfg = NULL;
 
 static int bench_shutdown_in_progress = 0;
 static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 #define TEST_PING 0
 #define TEST_PING_AND_DATA 1
 #define TEST_PERF_BY_SIZE 2
 #define TEST_PERF_BY_TIME 3
 
 static int test_type = TEST_PING;
 
 #define TEST_START 2
 #define TEST_STOP 4
 #define TEST_COMPLETE 6
 
 #define ONE_GIGABYTE 1073741824
 
 static uint64_t perf_by_size_size = 1 * ONE_GIGABYTE;
 static uint64_t perf_by_time_secs = 10;
 
 struct node {
 	int nodeid;
 	int links;
 	struct sockaddr_storage address[KNET_MAX_LINK];
 };
 
 static void print_help(void)
 {
 	printf("knet_bench usage:\n");
 	printf(" -h                                        print this help (no really)\n");
 	printf(" -d                                        enable debug logs (default INFO)\n");
 	printf(" -c [implementation]:[crypto]:[hashing]    crypto configuration. (default disabled)\n");
 	printf("                                           Example: -c nss:aes128:sha1\n");
+	printf(" -z [implementation]:[level]               compress configuration. (default disabled)\n");
+	printf("                                           Example: -z zlib:5\n");
 	printf(" -p [active|passive|rr]                    (default: passive)\n");
 	printf(" -P [udp|sctp]                             (default: udp) protocol (transport) to use\n");
 	printf(" -t [nodeid]                               This nodeid (required)\n");
 	printf(" -n [nodeid],[link1_ip_addr],[link2_..]    Other nodes information (at least one required)\n");
 	printf("                                           Example: -t 1,192.168.8.1,3ffe::8:1,..\n");
 	printf("                                           can be repeated up to %d and should contain also the localnode info\n", MAX_NODES);
 	printf(" -b [port]                                 baseport (default: 50000)\n");
 	printf(" -l                                        enable global listener on 0.0.0.0/:: (default: off, incompatible with -o)\n");
 	printf(" -o                                        enable baseport offset per nodeid\n");
 	printf(" -w                                        dont wait for all nodes to be up before starting the test (default: wait)\n");
 	printf(" -T [ping|ping_data|perf-by-size|perf-by-time]\n");
 	printf("                                           test type (default: ping)\n");
 	printf("                                           ping: will wait for all hosts to join the knet network, sleep 5 seconds and quit\n");
 	printf("                                           ping_data: will wait for all hosts to join the knet network, sends some data to all nodes and quit\n");
 	printf("                                           perf-by-size: will wait for all hosts to join the knet network,\n");
 	printf("                                                         perform a series of benchmarks by transmitting a known\n");
 	printf("                                                         size/quantity of packets and measuring the time, then quit\n");
 	printf("                                           perf-by-time: will wait for all hosts to join the knet network,\n");
 	printf("                                                         perform a series of benchmarks by transmitting a known\n");
 	printf("                                                         size of packets for a given amount of time (10 seconds)\n");
 	printf("                                                         and measuring the quantity of data transmitted, then quit\n");
 	printf(" -s                                        nodeid that will generate traffic for benchmarks\n");
 	printf(" -S [size|seconds]                         when used in combination with -T perf-by-size it indicates how many GB of traffic to generate for the test. (default: 1GB)\n");
 	printf("                                           when used in combination with -T perf-by-time it indicates how many Seconds of traffic to generate for the test. (default: 10 seconds)\n");
 	printf(" -C                                        repeat the test continously (default: off)\n");
 }
 
 static void parse_nodes(char *nodesinfo[MAX_NODES], int onidx, int port, struct node nodes[MAX_NODES], int *thisidx)
 {
 	int i;
 	char *temp = NULL;
 	char port_str[10];
 
 	memset(port_str, 0, sizeof(port_str));
 	sprintf(port_str, "%d", port);
 
 	for (i = 0; i < onidx; i++) {
 		nodes[i].nodeid = atoi(strtok(nodesinfo[i], ","));
 		if ((nodes[i].nodeid < 0) || (nodes[i].nodeid > KNET_MAX_HOST)) {
 			printf("Invalid nodeid: %d (0 - %d)\n", nodes[i].nodeid, KNET_MAX_HOST);
 			exit(FAIL);
 		}
 		if (thisnodeid == nodes[i].nodeid) {
 			*thisidx = i;
 		}
 		while((temp = strtok(NULL, ","))) {
 			if (nodes[i].links == KNET_MAX_LINK) {
 				printf("Too many links configured. Max %d\n", KNET_MAX_LINK);
 				exit(FAIL);
 			}
 			if (knet_strtoaddr(temp, port_str,
 					   &nodes[i].address[nodes[i].links],
 					   sizeof(struct sockaddr_storage)) < 0) {
 				printf("Unable to convert %s to sockaddress\n", temp);
 				exit(FAIL);
 			}
 			nodes[i].links++;
 		}
 	}
 
 	if (knet_strtoaddr("0.0.0.0", port_str, &allv4, sizeof(struct sockaddr_storage)) < 0) {
 		printf("Unable to convert 0.0.0.0 to sockaddress\n");
 		exit(FAIL);
 	}
 
 	if (knet_strtoaddr("::", port_str, &allv6, sizeof(struct sockaddr_storage)) < 0) {
 		printf("Unable to convert :: to sockaddress\n");
 		exit(FAIL);
 	}
 
 	for (i = 1; i < onidx; i++) {
 		if (nodes[0].links != nodes[i].links) {
 			printf("knet_bench does not support unbalanced link configuration\n");
 			exit(FAIL);
 		}
 	}
 
 	return;
 }
 
 static int private_data;
 
 static void sock_notify(void *pvt_data,
 			int local_datafd,
 			int8_t local_channel,
 			uint8_t tx_rx,
 			int error,
 			int errorno)
 {
 	printf("Error (%d - %d - %s) from socket: %d\n", error, errorno, strerror(errno), local_datafd);
 	return;
 }
 
 static int ping_dst_host_filter(void *pvt_data,
 				const unsigned char *outdata,
 				ssize_t outdata_len,
 				uint8_t tx_rx,
 				knet_node_id_t this_host_id,
 				knet_node_id_t src_host_id,
 				int8_t *dst_channel,
 				knet_node_id_t *dst_host_ids,
 				size_t *dst_host_ids_entries)
 {
 	if (broadcast_test) {
 		return 1;
 	}
 
 	if (tx_rx == KNET_NOTIFY_TX) {
 		memmove(&dst_host_ids[0], outdata, 2);
 	} else {
 		dst_host_ids[0] = this_host_id;
 	}
 	*dst_host_ids_entries = 1;
 	return 0;
 }
 
 static void setup_knet(int argc, char *argv[])
 {
 	int logfd;
 	int rv;
 	char *cryptocfg = NULL, *policystr = NULL, *protostr = NULL;
 	char *othernodeinfo[MAX_NODES];
 	struct node nodes[MAX_NODES];
 	int thisidx = -1;
 	int onidx = 0;
 	int debug = KNET_LOG_INFO;
 	int port = 50000, portoffset = 0;
 	int thisport = 0, otherport = 0;
 	int thisnewport = 0, othernewport = 0;
 	struct sockaddr_in *so_in;
 	struct sockaddr_in6 *so_in6;
 	struct sockaddr_storage *src;
 	int i, link_idx, allnodesup = 0;
 	int policy = KNET_LINK_POLICY_PASSIVE, policyfound = 0;
 	int protocol = KNET_TRANSPORT_UDP, protofound = 0;
 	int wait = 1;
 	struct knet_handle_crypto_cfg knet_handle_crypto_cfg;
 	char *cryptomodel = NULL, *cryptotype = NULL, *cryptohash = NULL;
+	struct knet_handle_compress_cfg knet_handle_compress_cfg;
 
 	memset(nodes, 0, sizeof(nodes));
 
-	while ((rv = getopt(argc, argv, "CT:S:s:ldowb:t:n:c:p:P:h")) != EOF) {
+	while ((rv = getopt(argc, argv, "CT:S:s:ldowb:t:n:c:p:P:z:h")) != EOF) {
 		switch(rv) {
 			case 'h':
 				print_help();
 				exit(PASS);
 				break;
 			case 'd':
 				debug = KNET_LOG_DEBUG;
 				break;
 			case 'c':
 				if (cryptocfg) {
 					printf("Error: -c can only be specified once\n");
 					exit(FAIL);
 				}
 				cryptocfg = optarg;
 				break;
 			case 'p':
 				if (policystr) {
 					printf("Error: -p can only be specified once\n");
 					exit(FAIL);
 				}
 				policystr = optarg;
 				if (!strcmp(policystr, "active")) {
 					policy = KNET_LINK_POLICY_ACTIVE;
 					policyfound = 1;
 				}
 				/*
 				 * we can't use rr because clangs can't compile
 				 * an array of 3 strings, one of which is 2 bytes long
 				 */
 				if (!strcmp(policystr, "round-robin")) {
 					policy = KNET_LINK_POLICY_RR;
 					policyfound = 1;
 				}
 				if (!strcmp(policystr, "passive")) {
 					policy = KNET_LINK_POLICY_PASSIVE;
 					policyfound = 1;
 				}
 				if (!policyfound) {
 					printf("Error: invalid policy %s specified. -p accepts active|passive|rr\n", policystr);
 					exit(FAIL);
 				}
 				break;
 		        case 'P':
 				if (protostr) {
 					printf("Error: -P can only be specified once\n");
 					exit(FAIL);
 				}
 				protostr = optarg;
 				if (!strcmp(protostr, "udp")) {
 					protocol = KNET_TRANSPORT_UDP;
 					protofound = 1;
 				}
 				if (!strcmp(protostr, "sctp")) {
 					protocol = KNET_TRANSPORT_SCTP;
 					protofound = 1;
 				}
 				if (!protofound) {
 					printf("Error: invalid protocol %s specified. -P accepts udp|sctp\n", policystr);
 					exit(FAIL);
 				}
 				break;
 			case 't':
 				if (thisnodeid >= 0) {
 					printf("Error: -t can only be specified once\n");
 					exit(FAIL);
 				}
 				thisnodeid = atoi(optarg);
 				if ((thisnodeid < 0) || (thisnodeid > 65536)) {
 					printf("Error: -t nodeid out of range %d (1 - 65536)\n", thisnodeid);
                                         exit(FAIL);
 				}
 				break;
 			case 'n':
 				if (onidx == MAX_NODES) {
 					printf("Error: too many other nodes. Max %d\n", MAX_NODES);
 					exit(FAIL);
 				}
 				othernodeinfo[onidx] = optarg;
 				onidx++;
 				break;
 			case 'b':
 				port = atoi(optarg);
 				if ((port < 1) || (port > 65536)) {
 					printf("Error: port %d out of range (1 - 65536)\n", port);
 					exit(FAIL);
 				}
 				break;
 			case 'o':
 				if (globallistener) {
 					printf("Error: -l cannot be used with -o\n");
 					exit(FAIL);
 				}
 				portoffset = 1;
 				break;
 			case 'l':
 				if (portoffset) {
 					printf("Error: -o cannot be used with -l\n");
 					exit(FAIL);
 				}
 				globallistener = 1;
 				break;
 			case 'w':
 				wait = 0;
 				break;
 			case 's':
 				if (senderid >= 0) {
 					printf("Error: -s can only be specified once\n");
 					exit(FAIL);
 				}
 				senderid = atoi(optarg);
 				if ((senderid < 0) || (senderid > 65536)) {
 					printf("Error: -s nodeid out of range %d (1 - 65536)\n", senderid);
                                         exit(FAIL);
 				}
 				break;
 			case 'T':
 				if (!strcmp("ping", optarg)) {
 					test_type = TEST_PING;
 				}
 				if (!strcmp("ping_data", optarg)) {
 					test_type = TEST_PING_AND_DATA;
 				}
 				if (!strcmp("perf-by-size", optarg)) {
 					test_type = TEST_PERF_BY_SIZE;
 				}
 				if (!strcmp("perf-by-time", optarg)) {
 					test_type = TEST_PERF_BY_TIME;
 				}
 				break;
 			case 'S':
 				perf_by_size_size = (uint64_t)atoi(optarg) * ONE_GIGABYTE;
 				perf_by_time_secs = (uint64_t)atoi(optarg);
 				break;
 			case 'C':
 				continous = 1;
 				break;
+			case 'z':
+				if (compresscfg) {
+					printf("Error: -c can only be specified once\n");
+					exit(FAIL);
+				}
+				compresscfg = optarg;
+				break;
 			default:
 				break;
 		}
 	}
 
 	if (thisnodeid < 0) {
 		printf("Who am I?!? missing -t from command line?\n");
 		exit(FAIL);
 	}
 
 	if (onidx < 1) {
 		printf("no other nodes configured?!? missing -n from command line\n");
 		exit(FAIL);
 	}
 
 	parse_nodes(othernodeinfo, onidx, port, nodes, &thisidx);
 
 	if (thisidx < 0) {
 		printf("no config for this node found\n");
 		exit(FAIL);
 	}
 
 	if (senderid >= 0) {
 		for (i=0; i < onidx; i++) {
 			if (senderid == nodes[i].nodeid) {
 				break;
 			}
 		}
 		if (i == onidx) {
 			printf("Unable to find senderid in nodelist\n");
 			exit(FAIL);
 		}
 	}
 
 	if (((test_type == TEST_PERF_BY_SIZE) || (test_type == TEST_PERF_BY_TIME)) && (senderid < 0)) {
 		printf("Error: performance test requires -s to be set (for now)\n");
 		exit(FAIL);
 	}
 
 	logfd = start_logging(stdout);
 
 	knet_h = knet_handle_new(thisnodeid, logfd, debug);
 	if (!knet_h) {
 		printf("Unable to knet_handle_new: %s\n", strerror(errno));
 		exit(FAIL);
 	}
 
 	if (cryptocfg) {
 		memset(&knet_handle_crypto_cfg, 0, sizeof(knet_handle_crypto_cfg));
 		cryptomodel = strtok(cryptocfg, ":");
 		cryptotype = strtok(NULL, ":");
 		cryptohash = strtok(NULL, ":");
 		if (cryptomodel) {
 			strncpy(knet_handle_crypto_cfg.crypto_model, cryptomodel, sizeof(knet_handle_crypto_cfg.crypto_model) - 1);
 		}
 		if (cryptotype) {
 			strncpy(knet_handle_crypto_cfg.crypto_cipher_type, cryptotype, sizeof(knet_handle_crypto_cfg.crypto_cipher_type) - 1);
 		}
 		if (cryptohash) {
 			strncpy(knet_handle_crypto_cfg.crypto_hash_type, cryptohash, sizeof(knet_handle_crypto_cfg.crypto_hash_type) - 1);
 		}
 		knet_handle_crypto_cfg.private_key_len = KNET_MAX_KEY_LEN;
 		if (knet_handle_crypto(knet_h, &knet_handle_crypto_cfg)) {
 			printf("Unable to init crypto\n");
 			exit(FAIL);
 		}
 	}
 
+	if (compresscfg) {
+		memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg));
+		snprintf(knet_handle_compress_cfg.compress_model, 16, "%s", strtok(compresscfg, ":"));
+		knet_handle_compress_cfg.compress_level = atoi(strtok(NULL, ":"));
+		if (knet_handle_compress(knet_h, &knet_handle_compress_cfg)) {
+			printf("Unable to configure compress\n");
+			exit(FAIL);
+		}
+	}
+
 	if (knet_handle_enable_sock_notify(knet_h, &private_data, sock_notify) < 0) {
 		printf("knet_handle_enable_sock_notify failed: %s\n", strerror(errno));
 		knet_handle_free(knet_h);
 		exit(FAIL);
         }
 
 	datafd = 0;
 	channel = -1;
 
 	if (knet_handle_add_datafd(knet_h, &datafd, &channel) < 0) {
 		printf("knet_handle_add_datafd failed: %s\n", strerror(errno));
 		knet_handle_free(knet_h);
 		exit(FAIL);
 	}
 
 	if (knet_handle_pmtud_setfreq(knet_h, 60) < 0) {
 		printf("knet_handle_pmtud_setfreq failed: %s\n", strerror(errno));
 		knet_handle_free(knet_h);
 		exit(FAIL);
 	}
 
 	for (i=0; i < onidx; i++) {
 		if (i == thisidx) {
 			continue;
 		}
 
 		if (knet_host_add(knet_h, nodes[i].nodeid) < 0) {
 			printf("knet_host_add failed: %s\n", strerror(errno));
 			exit(FAIL);
 		}
 
 		if (knet_host_set_policy(knet_h, nodes[i].nodeid, policy) < 0) {
 			printf("knet_host_set_policy failed: %s\n", strerror(errno));
 			exit(FAIL);
 		}
 
 		for (link_idx = 0; link_idx < nodes[i].links; link_idx++) {
 			if (portoffset) {
 				if (nodes[thisidx].address[link_idx].ss_family == AF_INET) {
 					so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx];
 					thisport = ntohs(so_in->sin_port);
 					thisnewport = thisport + nodes[i].nodeid;
 					so_in->sin_port = (htons(thisnewport));
 					so_in = (struct sockaddr_in *)&nodes[i].address[link_idx];
 					otherport = ntohs(so_in->sin_port);
 					othernewport = otherport + nodes[thisidx].nodeid;
 					so_in->sin_port = (htons(othernewport));
 				} else {
 					so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx];
 					thisport = ntohs(so_in6->sin6_port);
 					thisnewport = thisport + nodes[i].nodeid;
 					so_in6->sin6_port = (htons(thisnewport));
 					so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx];
 					otherport = ntohs(so_in6->sin6_port);
 					othernewport = otherport + nodes[thisidx].nodeid;
 					so_in6->sin6_port = (htons(othernewport));
 				}
 			}
 			if (!globallistener) {
 				src = &nodes[thisidx].address[link_idx];
 			} else {
 				if (nodes[thisidx].address[link_idx].ss_family == AF_INET) {
 					src = &allv4;
 				} else {
 					src = &allv6;
 				}
 			}
 			if (knet_link_set_config(knet_h, nodes[i].nodeid, link_idx,
 						 protocol, src,
 						 &nodes[i].address[link_idx], 0) < 0) {
 				printf("Unable to configure link: %s\n", strerror(errno));
 				exit(FAIL);
 			}
 			if (portoffset) {
 				if (nodes[thisidx].address[link_idx].ss_family == AF_INET) {
 					so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx];
 					so_in->sin_port = (htons(thisport));
 					so_in = (struct sockaddr_in *)&nodes[i].address[link_idx];
 					so_in->sin_port = (htons(otherport));
 				} else {
 					so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx];
 					so_in6->sin6_port = (htons(thisport));
 					so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx];
 					so_in6->sin6_port = (htons(otherport));
 				}
 			}
 			if (knet_link_set_enable(knet_h, nodes[i].nodeid, link_idx, 1) < 0) {
 				printf("knet_link_set_enable failed: %s\n", strerror(errno));
 				exit(FAIL);
 			}
 			if (knet_link_set_ping_timers(knet_h, nodes[i].nodeid, link_idx, 1000, 10000, 2048) < 0) {
 				printf("knet_link_set_ping_timers failed: %s\n", strerror(errno));
 				exit(FAIL);
 			}
 			if (knet_link_set_pong_count(knet_h, nodes[i].nodeid, link_idx, 2) < 0) {
 				printf("knet_link_set_pong_count failed: %s\n", strerror(errno));
 				exit(FAIL);
 			}
 		}
 	}
 
 	if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) {
 		printf("Unable to enable dst_host_filter: %s\n", strerror(errno));
 		exit(FAIL);
 	}
 
 	if (knet_handle_setfwd(knet_h, 1) < 0) {
 		printf("knet_handle_setfwd failed: %s\n", strerror(errno));
 		exit(FAIL);
 	}
 
 	if (wait) {
 		while(!allnodesup) {
 			allnodesup = 1;
 			for (i=0; i < onidx; i++) {
 				if (i == thisidx) {
 					continue;
 				}
 				if(knet_h->host_index[nodes[i].nodeid]->status.reachable != 1) {
 					printf("waiting host %d to be reachable\n", nodes[i].nodeid);
 					allnodesup = 0;
 				}
 			}
 			if (!allnodesup) {
 				sleep(1);
 			}
 		}
 		sleep(1);
 	}
 }
 
 static void *_rx_thread(void *args)
 {
 	int rx_epoll;
 	struct epoll_event ev;
 	struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
 
 	struct sockaddr_storage address[PCKT_FRAG_MAX];
 	struct knet_mmsghdr msg[PCKT_FRAG_MAX];
 	struct iovec iov_in[PCKT_FRAG_MAX];
 	int i, msg_recv;
 	struct timespec clock_start, clock_end;
 	unsigned long long time_diff = 0;
 	uint64_t rx_pkts = 0;
 	uint64_t rx_bytes = 0;
 	unsigned int current_pckt_size = 0;
 
 	for (i = 0; i < PCKT_FRAG_MAX; i++) {
 		rx_buf[i] = malloc(KNET_MAX_PACKET_SIZE);
 		if (!rx_buf[i]) {
 			printf("RXT: Unable to malloc!\n");
 			return NULL;
 		}
 		memset(rx_buf[i], 0, KNET_MAX_PACKET_SIZE);
 		iov_in[i].iov_base = (void *)rx_buf[i];
 		iov_in[i].iov_len = KNET_MAX_PACKET_SIZE;
 		memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
 		msg[i].msg_hdr.msg_name = &address[i];
 		msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
 		msg[i].msg_hdr.msg_iov = &iov_in[i];
 		msg[i].msg_hdr.msg_iovlen = 1;
 	}
 
 	rx_epoll = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
 	if (rx_epoll < 0) {
 		printf("RXT: Unable to create epoll!\nHALTING RX THREAD!\n");
 		return NULL;
 	}
 
 	memset(&ev, 0, sizeof(struct epoll_event));
 	ev.events = EPOLLIN;
 	ev.data.fd = datafd;
 
 	if (epoll_ctl(rx_epoll, EPOLL_CTL_ADD, datafd, &ev)) {
 		printf("RXT: Unable to add datafd to epoll\nHALTING RX THREAD!\n");
 		return NULL;
 	}
 
 	memset(&clock_start, 0, sizeof(clock_start));
 	memset(&clock_end, 0, sizeof(clock_start));
 
 	while (!bench_shutdown_in_progress) {
 		if (epoll_wait(rx_epoll, events, KNET_EPOLL_MAX_EVENTS, 1) >= 1) {
 			msg_recv = _recvmmsg(datafd, &msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL);
 			if (msg_recv < 0) {
 				printf("RXT: error from recvmmsg: %s\n", strerror(errno));
 			}
 			switch(test_type) {
 				case TEST_PING_AND_DATA:
 					for (i = 0; i < msg_recv; i++) {
 						if (msg[i].msg_len == 0) {
 							printf("RXT: received 0 bytes message?\n");
 						}
 						printf("received %u bytes message: %s\n", msg[i].msg_len, (char *)msg[i].msg_hdr.msg_iov->iov_base);
 					}
 					break;
 				case TEST_PERF_BY_TIME:
 				case TEST_PERF_BY_SIZE:
 					for (i = 0; i < msg_recv; i++) {
 						if (msg[i].msg_len < 64) {
 							if (msg[i].msg_len == 0) {
 								printf("RXT: received 0 bytes message?\n");
 							}
 							if (msg[i].msg_len == TEST_START) {
 								if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) {
 									printf("Unable to get start time!\n");
 								}
 							}
 							if (msg[i].msg_len == TEST_STOP) {
 								double average_rx_mbytes;
 								double average_rx_pkts;
 								double time_diff_sec;
 								if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) {
 									printf("Unable to get end time!\n");
 								}
 								timespec_diff(clock_start, clock_end, &time_diff);
 								/*
 								 * adjust for sleep(2) between sending the last data and TEST_STOP
 								 */
 								time_diff = time_diff - 2000000000llu;
 
 								/*
 								 * convert to seconds
 								 */
 								time_diff_sec = (double)time_diff / 1000000000llu;
 
 								average_rx_mbytes = (double)((rx_bytes / time_diff_sec) / (1024 * 1024));
 								average_rx_pkts = (double)(rx_pkts / time_diff_sec);
 								printf("Execution time: %8.4f secs Average speed: %8.4f MB/sec %8.4f pckts/sec (size: %u total: %" PRIu64 ")\n", time_diff_sec, average_rx_mbytes, average_rx_pkts, current_pckt_size, rx_pkts);
 								rx_pkts = 0;
 								rx_bytes = 0;
 								current_pckt_size = 0;
 							}
 							if (msg[i].msg_len == TEST_COMPLETE) {
 								wait_for_perf_rx = 1;
 							}
 							continue;
 						}
 						rx_pkts++;
 						rx_bytes = rx_bytes + msg[i].msg_len;
 						current_pckt_size = msg[i].msg_len;
 					}
 					break;
 			}
 		}
 	}
 
 	epoll_ctl(rx_epoll, EPOLL_CTL_DEL, datafd, &ev);
 	close(rx_epoll);
 
 	return NULL;
 }
 
 static void setup_data_txrx_common(void)
 {
 	if (!rx_thread) {
 		if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) {
 			printf("Unable to enable dst_host_filter: %s\n", strerror(errno));
 			exit(FAIL);
 		}
 		printf("Setting up rx thread\n");
 		if (pthread_create(&rx_thread, 0, _rx_thread, NULL)) {
 			printf("Unable to start rx thread\n");
 			exit(FAIL);
 		}
 	}
 }
 
 static void stop_rx_thread(void)
 {
 	void *retval;
 	int i;
 
 	if (rx_thread) {
 		printf("Shutting down rx thread\n");
 		sleep(2);
 		pthread_cancel(rx_thread);
 		pthread_join(rx_thread, &retval);
 		for (i = 0; i < PCKT_FRAG_MAX; i ++) {
 			free(rx_buf[i]);
 		}
 	}
 }
 
 static void send_ping_data(void)
 {
-	const char *buf = "Hello world!\x0";
-	ssize_t len = strlen(buf);
+	char buf[65535];
+	ssize_t len;
+
+	memset(&buf, 0, sizeof(buf));
+	snprintf(buf, sizeof(buf), "Hello world!");
+
+	if (compresscfg) {
+		len = sizeof(buf);
+	} else {
+		len = strlen(buf);
+	}
 
 	if (knet_send(knet_h, buf, len, channel) != len) {
 		printf("Error sending hello world: %s\n", strerror(errno));
 	}
 	sleep(1);
 }
 
 static int send_messages(struct knet_mmsghdr *msg, int msgs_to_send)
 {
 	int sent_msgs, prev_sent, progress, total_sent;
 
 	total_sent = 0;
 	sent_msgs = 0;
 	prev_sent = 0;
 	progress = 1;
 
 retry:
 	errno = 0;
 	sent_msgs = _sendmmsg(datafd, &msg[0], msgs_to_send, MSG_NOSIGNAL);
 
 	if (sent_msgs < 0) {
 		if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
 			usleep(KNET_THREADS_TIMERES / 16);
 			goto retry;
 		}
 		printf("Unable to send messages: %s\n", strerror(errno));
 		return -1;
 	}
 
 	total_sent = total_sent + sent_msgs;
 
 	if ((sent_msgs >= 0) && (sent_msgs < msgs_to_send)) {
 		if ((sent_msgs) || (progress)) {
 			msgs_to_send = msgs_to_send - sent_msgs;
 			prev_sent = prev_sent + sent_msgs;
 			if (sent_msgs) {
 				progress = 1;
 			} else {
 				progress = 0;
 			}
 			goto retry;
 		}
 		if (!progress) {
 			printf("Unable to send more messages after retry\n");
 		}
 	}
 	return total_sent;
 }
 
 static int setup_send_buffers_common(struct knet_mmsghdr *msg, struct iovec *iov_out, char *tx_buf[])
 {
 	int i;
 
 	for (i = 0; i < PCKT_FRAG_MAX; i++) {
 		tx_buf[i] = malloc(KNET_MAX_PACKET_SIZE);
 		if (!tx_buf[i]) {
 			printf("TXT: Unable to malloc!\n");
 			return -1;
 		}
 		memset(tx_buf[i], 0, KNET_MAX_PACKET_SIZE);
 		iov_out[i].iov_base = (void *)tx_buf[i];
 		memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
 		msg[i].msg_hdr.msg_iov = &iov_out[i];
 		msg[i].msg_hdr.msg_iovlen = 1;
 	}
 	return 0;
 }
 
 static void send_perf_data_by_size(void)
 {
 	char *tx_buf[PCKT_FRAG_MAX];
 	struct knet_mmsghdr msg[PCKT_FRAG_MAX];
 	struct iovec iov_out[PCKT_FRAG_MAX];
 	char ctrl_message[16];
 	int sent_msgs;
 	int i;
 	uint64_t total_pkts_to_tx;
 	uint64_t packets_to_send;
 	uint32_t packetsize = 64;
 
 	setup_send_buffers_common(msg, iov_out, tx_buf);
 
 	while (packetsize <= KNET_MAX_PACKET_SIZE) {
 		for (i = 0; i < PCKT_FRAG_MAX; i++) {
 			iov_out[i].iov_len = packetsize;
 		}
 
 		total_pkts_to_tx = perf_by_size_size / packetsize;
 		printf("Testing with %u packet size. Total bytes to transfer: %" PRIu64 " (%" PRIu64 " packets)\n", packetsize, perf_by_size_size, total_pkts_to_tx);
 
 		memset(ctrl_message, 0, sizeof(ctrl_message));
 		knet_send(knet_h, ctrl_message, TEST_START, channel);
 
 		while (total_pkts_to_tx > 0) {
 			if (total_pkts_to_tx >= PCKT_FRAG_MAX) {
 				packets_to_send = PCKT_FRAG_MAX;
 			} else {
 				packets_to_send = total_pkts_to_tx;
 			}
 			sent_msgs = send_messages(&msg[0], packets_to_send);
 			if (sent_msgs < 0) {
 				printf("Something went wrong, aborting\n");
 				exit(FAIL);
 			}
 			total_pkts_to_tx = total_pkts_to_tx - sent_msgs;
 		}
 
 		sleep(2);
 
 		knet_send(knet_h, ctrl_message, TEST_STOP, channel);
 
 		if (packetsize == KNET_MAX_PACKET_SIZE) {
 			break;
 		}
 
 		/*
 		 * Use a multiplier that can always divide properly a GB
 		 * into smaller chunks without worry about boundaries
 		 */
 		packetsize *= 4;
 
 		if (packetsize > KNET_MAX_PACKET_SIZE) {
 			packetsize = KNET_MAX_PACKET_SIZE;
 		}
 	}
 
 	knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel);
 
 	for (i = 0; i < PCKT_FRAG_MAX; i++) {
 		free(tx_buf[i]);
 	}
 }
 
 static void send_perf_data_by_time(void)
 {
 	char *tx_buf[PCKT_FRAG_MAX];
 	struct knet_mmsghdr msg[PCKT_FRAG_MAX];
 	struct iovec iov_out[PCKT_FRAG_MAX];
 	char ctrl_message[16];
 	int sent_msgs;
 	int i;
-	uint32_t packetsize = 64;
+	uint32_t packetsize = 65536;
 	struct timespec clock_start, clock_end;
 	unsigned long long time_diff = 0;
 
 	setup_send_buffers_common(msg, iov_out, tx_buf);
 
 	memset(&clock_start, 0, sizeof(clock_start));
 	memset(&clock_end, 0, sizeof(clock_start));
 
 	while (packetsize <= KNET_MAX_PACKET_SIZE) {
 		for (i = 0; i < PCKT_FRAG_MAX; i++) {
 			iov_out[i].iov_len = packetsize;
 		}
 		printf("Testing with %u bytes packet size for %" PRIu64 " seconds.\n", packetsize, perf_by_time_secs);
 
 		memset(ctrl_message, 0, sizeof(ctrl_message));
 		knet_send(knet_h, ctrl_message, TEST_START, channel);
 
 		if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) {
 			printf("Unable to get start time!\n");
 		}
 
 		time_diff = 0;
 
 		while (time_diff < (perf_by_time_secs * 1000000000llu)) {
 			sent_msgs = send_messages(&msg[0], PCKT_FRAG_MAX);
 			if (sent_msgs < 0) {
 				printf("Something went wrong, aborting\n");
 				exit(FAIL);
 			}
 			if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) {
 				printf("Unable to get end time!\n");
 			}
 			timespec_diff(clock_start, clock_end, &time_diff);
 		}
 
 		sleep(2);
 
 		knet_send(knet_h, ctrl_message, TEST_STOP, channel);
 
 		if (packetsize == KNET_MAX_PACKET_SIZE) {
 			break;
 		}
 
 		/*
 		 * Use a multiplier that can always divide properly a GB
 		 * into smaller chunks without worry about boundaries
 		 */
 		packetsize *= 4;
 
 		if (packetsize > KNET_MAX_PACKET_SIZE) {
 			packetsize = KNET_MAX_PACKET_SIZE;
 		}
 	}
 
 	knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel);
 
 	for (i = 0; i < PCKT_FRAG_MAX; i++) {
 		free(tx_buf[i]);
 	}
 }
 
 static void cleanup_all(void)
 {
 	if (pthread_mutex_lock(&shutdown_mutex)) {
 		return;
 	}
 
 	if (bench_shutdown_in_progress) {
 		pthread_mutex_unlock(&shutdown_mutex);
 		return;
 	}
 
 	bench_shutdown_in_progress = 1;
 
 	pthread_mutex_unlock(&shutdown_mutex);
 
 	if (rx_thread) {
 		stop_rx_thread();
 	}
 	knet_handle_stop(knet_h);
 }
 
 static void sigint_handler(int signum)
 {
 	printf("Cleaning up... got signal: %d\n", signum);
 	cleanup_all();
 	exit(PASS);
 }
 
 int main(int argc, char *argv[])
 {
 	if (signal(SIGINT, sigint_handler) == SIG_ERR) {
 		printf("Unable to configure SIGINT handler\n");
 		exit(FAIL);
 	}
 
 	need_root();
 
 	setup_knet(argc, argv);
 
 	setup_data_txrx_common();
 
 	sleep(5);
 
 restart:
 	switch(test_type) {
 		default:
 		case TEST_PING: /* basic ping, no data */
 			sleep(5);
 			break;
 		case TEST_PING_AND_DATA:
 			send_ping_data();
 			break;
 		case TEST_PERF_BY_SIZE:
 			if (senderid == thisnodeid) {
 				send_perf_data_by_size();
 			} else {
 				printf("Waiting for perf rx thread to finish\n");
 				while(!wait_for_perf_rx) {
 					sleep(1);
 				}
 			}
 			break;
 		case TEST_PERF_BY_TIME:
 			if (senderid == thisnodeid) {
 				send_perf_data_by_time();
 			} else {
 				printf("Waiting for perf rx thread to finish\n");
 				while(!wait_for_perf_rx) {
 					sleep(1);
 				}
 			}
 			break;
 	}
 	if (continous) {
 		goto restart;
 	}
 
 	cleanup_all();
 
 	return PASS;
 }
diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
index 0a3ea700..35300245 100644
--- a/libknet/threads_rx.c
+++ b/libknet/threads_rx.c
@@ -1,760 +1,784 @@
 /*
  * Copyright (C) 2010-2017 Red Hat, Inc.  All rights reserved.
  *
  * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
  *          Federico Simoncelli <fsimon@kronosnet.org>
  *
  * This software licensed under GPL-2.0+, LGPL-2.0+
  */
 
 #include "config.h"
 
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
 #include <sys/uio.h>
 #include <pthread.h>
 
 #include "compat.h"
+#include "compress.h"
 #include "crypto.h"
 #include "host.h"
 #include "link.h"
 #include "logging.h"
 #include "transports.h"
 #include "threads_common.h"
 #include "threads_heartbeat.h"
 #include "threads_rx.h"
 #include "netutils.h"
 
 /*
  * RECV
  */
 
 /*
  *  return 1 if a > b
  *  return -1 if b > a
  *  return 0 if they are equal
  */
 static inline int timecmp(struct timespec a, struct timespec b)
 {
 	if (a.tv_sec != b.tv_sec) {
 		if (a.tv_sec > b.tv_sec) {
 			return 1;
 		} else {
 			return -1;
 		}
 	} else {
 		if (a.tv_nsec > b.tv_nsec) {
 			return 1;
 		} else if (a.tv_nsec < b.tv_nsec) {
 			return -1;
 		} else {
 			return 0;
 		}
 	}
 }
 
 /*
  * this functions needs to return an index (0 to 7)
  * to a knet_host_defrag_buf. (-1 on errors)
  */
 
 static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
 {
 	struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
 	int i, oldest;
 
 	/*
 	 * check if there is a buffer already in use handling the same seq_num
 	 */
 	for (i = 0; i < KNET_MAX_LINK; i++) {
 		if (src_host->defrag_buf[i].in_use) {
 			if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
 				return i;
 			}
 		}
 	}
 
 	/*
 	 * If there is no buffer that's handling the current seq_num
 	 * either it's new or it's been reclaimed already.
 	 * check if it's been reclaimed/seen before using the defrag circular
 	 * buffer. If the pckt has been seen before, the buffer expired (ETIME)
 	 * and there is no point to try to defrag it again.
 	 */
 	if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
 		errno = ETIME;
 		return -1;
 	}
 
 	/*
 	 * register the pckt as seen
 	 */
 	_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
 
 	/*
 	 * see if there is a free buffer
 	 */
 	for (i = 0; i < KNET_MAX_LINK; i++) {
 		if (!src_host->defrag_buf[i].in_use) {
 			return i;
 		}
 	}
 
 	/*
 	 * at this point, there are no free buffers, the pckt is new
 	 * and we need to reclaim a buffer, and we will take the one
 	 * with the oldest timestamp. It's as good as any.
 	 */
 
 	oldest = 0;
 
 	for (i = 0; i < KNET_MAX_LINK; i++) {
 		if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
 			oldest = i;
 		}
 	}
 	src_host->defrag_buf[oldest].in_use = 0;
 	return oldest;
 }
 
 static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
 {
 	struct knet_host_defrag_buf *defrag_buf;
 	int defrag_buf_idx;
 
 	defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
 	if (defrag_buf_idx < 0) {
 		if (errno == ETIME) {
 			log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired");
 		}
 		return 1;
 	}
 
 	defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
 
 	/*
 	 * if the buf is not is use, then make sure it's clean
 	 */
 	if (!defrag_buf->in_use) {
 		memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
 		defrag_buf->in_use = 1;
 		defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
 	}
 
 	/*
 	 * update timestamp on the buffer
 	 */
 	clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
 
 	/*
 	 * check if we already received this fragment
 	 */
 	if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
 		/*
 		 * if we have received this fragment and we didn't clear the buffer
 		 * it means that we don't have all fragments yet
 		 */
 		return 1;
 	}
 
 	/*
 	 *  we need to handle the last packet with gloves due to its different size
 	 */
 
 	if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
 		defrag_buf->last_frag_size = *len;
 
 		/*
 		 * in the event when the last packet arrives first,
 		 * we still don't know the offset vs the other fragments (based on MTU),
 		 * so we store the fragment at the end of the buffer where it's safe
 		 * and take a copy of the len so that we can restore its offset later.
 		 * remember we can't use the local MTU for this calculation because pMTU
 		 * can be asymettric between the same hosts.
 		 */
 		if (!defrag_buf->frag_size) {
 			defrag_buf->last_first = 1;
 			memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
 			       inbuf->khp_data_userdata,
 			       *len);
 		}
 	} else {
 		defrag_buf->frag_size = *len;
 	}
 
 	memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
 	       inbuf->khp_data_userdata, *len);
 
 	defrag_buf->frag_recv++;
 	defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
 
 	/*
 	 * check if we received all the fragments
 	 */
 	if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
 		/*
 		 * special case the last pckt
 		 */
 
 		if (defrag_buf->last_first) {
 			memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
 			        defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
 				defrag_buf->last_frag_size);
 		}
 
 		/*
 		 * recalculate packet lenght
 		 */
 
 		*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
 
 		/*
 		 * copy the pckt back in the user data
 		 */
 		memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
 
 		/*
 		 * free this buffer
 		 */
 		defrag_buf->in_use = 0;
 		return 0;
 	}
 
 	return 1;
 }
 
 static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
 {
 	int err = 0, savederrno = 0;
 	ssize_t outlen;
 	struct knet_host *src_host;
 	struct knet_link *src_link;
 	unsigned long long latency_last;
 	knet_node_id_t dst_host_ids[KNET_MAX_HOST];
 	size_t dst_host_ids_entries = 0;
 	int bcast = 1;
 	struct timespec recvtime;
 	struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
 	unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
 	ssize_t len = msg->msg_len;
 	struct knet_hostinfo *knet_hostinfo;
 	struct iovec iov_out[1];
 	int8_t channel;
 	struct sockaddr_storage pckt_src;
 	seq_num_t recv_seq_num;
 	int wipe_bufs = 0;
 
 	if (knet_h->crypto_instance) {
 		if (crypto_authenticate_and_decrypt(knet_h,
 						    (unsigned char *)inbuf,
 						    len,
 						    knet_h->recv_from_links_buf_decrypt,
 						    &outlen) < 0) {
 			log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
 			return;
 		}
 		len = outlen;
 		inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
 	}
 
 	if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
 		log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
 		return;
 	}
 
 	if (inbuf->kh_version != KNET_HEADER_VERSION) {
 		log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
 		return;
 	}
 
 	inbuf->kh_node = ntohs(inbuf->kh_node);
 	src_host = knet_h->host_index[inbuf->kh_node];
 	if (src_host == NULL) {  /* host not found */
 		log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
 		return;
 	}
 
 	src_link = NULL;
 
 	src_link = src_host->link +
 		(inbuf->khp_ping_link % KNET_MAX_LINK);
 	if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
 		if (src_link->dynamic == KNET_LINK_DYNIP) {
 			/*
 			 * cpyaddrport will only copy address and port of the incoming
 			 * packet and strip extra bits such as flow and scopeid
 			 */
 			cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
 
 			if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
 				    &pckt_src, sockaddr_len(&pckt_src)) != 0) {
 				log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
 					  src_host->host_id, src_link->link_id);
 				memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
 				if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
 						src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
 						src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
 					log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
 					snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
 					snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
 				} else {
 					log_info(knet_h, KNET_SUB_RX,
 						 "host: %u link: %u new connection established from: %s %s",
 						 src_host->host_id, src_link->link_id,
 						 src_link->status.dst_ipaddr, src_link->status.dst_port);
 				}
 			}
 			/*
 			 * transport has already accepted the connection here
 			 * otherwise we would not be receiving packets
 			 */
 			knet_h->transport_ops[src_link->transport_type]->transport_link_dyn_connect(knet_h, sockfd, src_link);
 		}
 	}
 
 	switch (inbuf->kh_type) {
 	case KNET_HEADER_TYPE_HOST_INFO:
 	case KNET_HEADER_TYPE_DATA:
 		/*
 		 * TODO: should we accept data even if we can't reply to the other node?
 		 *       how would that work with SCTP and guaranteed delivery?
 		 */
 
 		if (!src_host->status.reachable) {
 			log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet", src_host->host_id);
 			//return;
 		}
 		inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
 		channel = inbuf->khp_data_channel;
 		src_host->got_data = 1;
 
 		if (src_link) {
 			src_link->status.stats.rx_data_packets++;
 			src_link->status.stats.rx_data_bytes += len;
 		}
 
 		if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
 			if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
 				log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
 			}
 			return;
 		}
 
 		if (inbuf->khp_data_frag_num > 1) {
 			/*
 			 * len as received from the socket also includes extra stuff
 			 * that the defrag code doesn't care about. So strip it
 			 * here and readd only for repadding once we are done
 			 * defragging
 			 */
 			len = len - KNET_HEADER_DATA_SIZE;
 			if (pckt_defrag(knet_h, inbuf, &len)) {
 				return;
 			}
 			len = len + KNET_HEADER_DATA_SIZE;
 		}
 
+		if (inbuf->khp_data_compress) {
+			ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
+
+			if (inbuf->khp_data_compress > knet_h->compress_max_model) {
+				log_err(knet_h, KNET_SUB_COMPRESS, "Received packet with unsupported compression method. Dropping");
+				return;
+			}
+
+			err = decompress(knet_h, inbuf->khp_data_compress,
+					 (const unsigned char *)inbuf->khp_data_userdata,
+					 len - KNET_HEADER_DATA_SIZE,
+					 knet_h->recv_from_links_buf_decompress,
+					 &decmp_outlen);
+			if (!err) {
+				memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
+				len = decmp_outlen + KNET_HEADER_DATA_SIZE;
+			} else {
+				log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
+					 err, strerror(errno));
+				return;
+			}
+		}
+
 		if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
 			if (knet_h->enabled != 1) /* data forward is disabled */
 				break;
 
 			if (knet_h->dst_host_filter_fn) {
 				size_t host_idx;
 				int found = 0;
 
 				bcast = knet_h->dst_host_filter_fn(
 						knet_h->dst_host_filter_fn_private_data,
 						(const unsigned char *)inbuf->khp_data_userdata,
 						len - KNET_HEADER_DATA_SIZE,
 						KNET_NOTIFY_RX,
 						knet_h->host_id,
 						inbuf->kh_node,
 						&channel,
 						dst_host_ids,
 						&dst_host_ids_entries);
 				if (bcast < 0) {
 					log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
 					return;
 				}
 
 				if ((!bcast) && (!dst_host_ids_entries)) {
 					log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
 					return;
 				}
 
 				/* check if we are dst for this packet */
 				if (!bcast) {
 					for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
 						if (dst_host_ids[host_idx] == knet_h->host_id) {
 							found = 1;
 							break;
 						}
 					}
 					if (!found) {
 						log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
 						return;
 					}
 				}
 			}
 		}
 
 		if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
 			if (!knet_h->sockfd[channel].in_use) {
 				log_debug(knet_h, KNET_SUB_RX,
 					  "received packet for channel %d but there is no local sock connected",
 					  channel);
 				return;
 			}
 
 			memset(iov_out, 0, sizeof(iov_out));
 			iov_out[0].iov_base = (void *) inbuf->khp_data_userdata;
 			iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE;
 
 			outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
 			if (outlen <= 0) {
 				knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
 						       knet_h->sockfd[channel].sockfd[0],
 						       channel,
 						       KNET_NOTIFY_RX,
 						       outlen,
 						       errno);
 				return;
 			}
 			if ((size_t)outlen == iov_out[0].iov_len) {
 				_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
 			}
 		} else { /* HOSTINFO */
 			knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
 			if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
 				bcast = 0;
 				knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
 			}
 			if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
 				return;
 			}
 			_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
 			switch(knet_hostinfo->khi_type) {
 				case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
 					break;
 				case KNET_HOSTINFO_TYPE_LINK_TABLE:
 					break;
 				default:
 					log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
 					break;
 			}
 		}
 		break;
 	case KNET_HEADER_TYPE_PING:
 		outlen = KNET_HEADER_PING_SIZE;
 		inbuf->kh_type = KNET_HEADER_TYPE_PONG;
 		inbuf->kh_node = htons(knet_h->host_id);
 		recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
 		src_link->status.stats.rx_ping_packets++;
 		src_link->status.stats.rx_ping_bytes += len;
 
 		wipe_bufs = 0;
 
 		if (!inbuf->khp_ping_timed) {
 			/*
 			 * we might be receiving this message from all links, but we want
 			 * to process it only the first time
 			 */
 			if (recv_seq_num != src_host->untimed_rx_seq_num) {
 				/*
 				 * cache the untimed seq num
 				 */
 				src_host->untimed_rx_seq_num = recv_seq_num;
 				/*
 				 * if the host has received data in between
 				 * untimed ping, then we don't need to wipe the bufs
 				 */
 				if (src_host->got_data) {
 					src_host->got_data = 0;
 					wipe_bufs = 0;
 				} else {
 					wipe_bufs = 1;
 				}
 			}
 			_seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
 		} else {
 			/*
 			 * pings always arrives in bursts over all the link
 			 * catch the first of them to cache the seq num and
 			 * avoid duplicate processing
 			 */
 			if (recv_seq_num != src_host->timed_rx_seq_num) {
 				src_host->timed_rx_seq_num = recv_seq_num;
 
 				if (recv_seq_num == 0) {
 					_seq_num_lookup(src_host, recv_seq_num, 0, 1);
 				}
 			}
 		}
 
 		if (knet_h->crypto_instance) {
 			if (crypto_encrypt_and_sign(knet_h,
 						    (const unsigned char *)inbuf,
 						    len,
 						    knet_h->recv_from_links_buf_crypt,
 						    &outlen) < 0) {
 				log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
 				break;
 			}
 			outbuf = knet_h->recv_from_links_buf_crypt;
 		}
 
 retry_pong:
 		len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
 				(struct sockaddr *) &src_link->dst_addr,
 				sizeof(struct sockaddr_storage));
 		savederrno = errno;
 		if (len != outlen) {
 			err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
 			switch(err) {
 				case -1: /* unrecoverable error */
 					log_debug(knet_h, KNET_SUB_RX,
 						  "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
 						  src_link->outsock, errno, strerror(errno),
 						  src_link->status.src_ipaddr, src_link->status.src_port,
 						  src_link->status.dst_ipaddr, src_link->status.dst_port);
 					src_link->status.stats.tx_pong_errors++;
 					break;
 				case 0: /* ignore error and continue */
 					break;
 				case 1: /* retry to send those same data */
 					src_link->status.stats.tx_pong_retries++;
 					goto retry_pong;
 					break;
 			}
 		}
 		src_link->status.stats.tx_pong_packets++;
 		src_link->status.stats.tx_pong_bytes += outlen;
 		break;
 	case KNET_HEADER_TYPE_PONG:
 		src_link->status.stats.rx_pong_packets++;
 		src_link->status.stats.rx_pong_bytes += len;
 		clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
 
 		memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
 		timespec_diff(recvtime,
 				src_link->status.pong_last, &latency_last);
 
 		src_link->status.latency =
 			((src_link->status.latency * src_link->latency_exp) +
 			((latency_last / 1000llu) *
 				(src_link->latency_fix - src_link->latency_exp))) /
 					src_link->latency_fix;
 
 		if (src_link->status.latency < src_link->pong_timeout) {
 			if (!src_link->status.connected) {
 				if (src_link->received_pong >= src_link->pong_count) {
 					log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
 						 src_host->host_id, src_link->link_id);
 					_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1);
 				} else {
 					src_link->received_pong++;
 					log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
 						  src_host->host_id, src_link->link_id, src_link->received_pong);
 				}
 			}
 		}
 		/* Calculate latency stats */
 		if (src_link->status.latency > src_link->status.stats.latency_max) {
 			src_link->status.stats.latency_max = src_link->status.latency;
 		}
 		if (src_link->status.latency < src_link->status.stats.latency_min) {
 			src_link->status.stats.latency_min = src_link->status.latency;
 		}
 		src_link->status.stats.latency_ave =
 			(src_link->status.stats.latency_ave * src_link->status.stats.latency_samples +
 			 src_link->status.latency) / (src_link->status.stats.latency_samples+1);
 		src_link->status.stats.latency_samples++;
 
 		break;
 	case KNET_HEADER_TYPE_PMTUD:
 		src_link->status.stats.rx_pmtu_packets++;
 		src_link->status.stats.rx_pmtu_bytes += len;
 		outlen = KNET_HEADER_PMTUD_SIZE;
 		inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
 		inbuf->kh_node = htons(knet_h->host_id);
 
 		if (knet_h->crypto_instance) {
 			if (crypto_encrypt_and_sign(knet_h,
 						    (const unsigned char *)inbuf,
 						    len,
 						    knet_h->recv_from_links_buf_crypt,
 						    &outlen) < 0) {
 				log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
 				break;
 			}
 			outbuf = knet_h->recv_from_links_buf_crypt;
 		}
 
 retry_pmtud:
 		len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
 				(struct sockaddr *) &src_link->dst_addr,
 				sizeof(struct sockaddr_storage));
 		if (len != outlen) {
 			err = knet_h->transport_ops[src_link->transport_type]->transport_tx_sock_error(knet_h, src_link->outsock, len, savederrno);
 			switch(err) {
 				case -1: /* unrecoverable error */
 					log_debug(knet_h, KNET_SUB_RX,
 						  "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
 						  src_link->outsock, errno, strerror(errno),
 						  src_link->status.src_ipaddr, src_link->status.src_port,
 						  src_link->status.dst_ipaddr, src_link->status.dst_port);
 
 					src_link->status.stats.tx_pmtu_errors++;
 					break;
 				case 0: /* ignore error and continue */
 					src_link->status.stats.tx_pmtu_errors++;
 					break;
 				case 1: /* retry to send those same data */
 					src_link->status.stats.tx_pmtu_retries++;
 					goto retry_pmtud;
 					break;
 			}
 		}
 
 		break;
 	case KNET_HEADER_TYPE_PMTUD_REPLY:
 		src_link->status.stats.rx_pmtu_packets++;
 		src_link->status.stats.rx_pmtu_bytes += len;
 		if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
 			log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
 			break;
 		}
 		src_link->last_recv_mtu = inbuf->khp_pmtud_size;
 		pthread_cond_signal(&knet_h->pmtud_cond);
 		pthread_mutex_unlock(&knet_h->pmtud_mutex);
 		break;
 	default:
 		return;
 	}
 }
 
 static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
 {
 	int err, savederrno;
 	int i, msg_recv, transport;
 
 	if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
 		log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
 		return;
 	}
 
 	if (_is_valid_fd(knet_h, sockfd) < 1) {
 		/*
 		 * this is normal if a fd got an event and before we grab the read lock
 		 * and the link is removed by another thread
 		 */
 		goto exit_unlock;
 	}
 
 	transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
 
 	/*
 	 * reset msg_namelen to buffer size because after recvmmsg
 	 * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
 	 */
 
 	for (i = 0; i < PCKT_RX_BUFS; i++) {
 		msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
 	}
 
 	msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
 	savederrno = errno;
 
 	/*
 	 * WARNING: man page for recvmmsg is wrong. Kernel implementation here:
 	 * recvmmsg can return:
 	 * -1 on error
 	 *  0 if the previous run of recvmmsg recorded an error on the socket
 	 *  N number of messages (see exception below).
 	 *
 	 * If there is an error from recvmsg after receiving a frame or more, the recvmmsg
 	 * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
 	 * it will be visibile in the next run.
 	 *
 	 * Need to be careful how we handle errors at this stage.
 	 *
 	 * error messages need to be handled on a per transport/protocol base
 	 * at this point we have different layers of error handling
 	 * - msg_recv < 0 -> error from this run
 	 *   msg_recv = 0 -> error from previous run and error on socket needs to be cleared
 	 * - per-transport message data
 	 *   example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
 	 *            but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
 	 * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
 	 *         errno set. That means the error api needs to be able to abort the loop below.
 	 */
 
 	if (msg_recv <= 0) {
 		knet_h->transport_ops[transport]->transport_rx_sock_error(knet_h, sockfd, msg_recv, savederrno);
 		goto exit_unlock;
 	}
 
 	for (i = 0; i < msg_recv; i++) {
 		err = knet_h->transport_ops[transport]->transport_rx_is_data(knet_h, sockfd, &msg[i]);
 
 		/*
 		 * TODO: make this section silent once we are confident
 		 *       all protocols packet handlers are good
 		 */
 
 		switch(err) {
 			case -1: /* on error */
 				log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
 				goto exit_unlock;
 				break;
 			case 0: /* packet is not data and we should continue the packet process loop */
 				log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
 				break;
 			case 1: /* packet is not data and we should STOP the packet process loop */
 				log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
 				goto exit_unlock;
 				break;
 			case 2: /* packet is data and should be parsed as such */
 				_parse_recv_from_links(knet_h, sockfd, &msg[i]);
 				break;
 		}
 	}
 
 exit_unlock:
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 }
 
 void *_handle_recv_from_links_thread(void *data)
 {
 	int i, nev;
 	knet_handle_t knet_h = (knet_handle_t) data;
 	struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
 	struct sockaddr_storage address[PCKT_RX_BUFS];
 	struct knet_mmsghdr msg[PCKT_RX_BUFS];
 	struct iovec iov_in[PCKT_RX_BUFS];
 
 	memset(&msg, 0, sizeof(msg));
 
 	for (i = 0; i < PCKT_RX_BUFS; i++) {
 		iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
 		iov_in[i].iov_len = KNET_DATABUFSIZE;
 
 		memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
 
 		msg[i].msg_hdr.msg_name = &address[i];
 		msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
 		msg[i].msg_hdr.msg_iov = &iov_in[i];
 		msg[i].msg_hdr.msg_iovlen = 1;
 	}
 
 	while (!shutdown_in_progress(knet_h)) {
 		nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1);
 
 		for (i = 0; i < nev; i++) {
 			_handle_recv_from_links(knet_h, events[i].data.fd, msg);
 		}
 	}
 
 	return NULL;
 }
diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c
index 2913727b..41724d4b 100644
--- a/libknet/threads_tx.c
+++ b/libknet/threads_tx.c
@@ -1,651 +1,679 @@
 /*
  * Copyright (C) 2010-2017 Red Hat, Inc.  All rights reserved.
  *
  * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
  *          Federico Simoncelli <fsimon@kronosnet.org>
  *
  * This software licensed under GPL-2.0+, LGPL-2.0+
  */
 
 #include "config.h"
 
 #include <math.h>
 #include <string.h>
 #include <pthread.h>
 #include <unistd.h>
 #include <sys/uio.h>
 #include <errno.h>
 
 #include "compat.h"
+#include "compress.h"
 #include "crypto.h"
 #include "host.h"
 #include "link.h"
 #include "logging.h"
 #include "transports.h"
 #include "threads_common.h"
 #include "threads_heartbeat.h"
 #include "threads_tx.h"
 #include "netutils.h"
 
 /*
  * SEND
  */
 
 static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
 {
 	int link_idx, msg_idx, sent_msgs, prev_sent, progress;
 	int err = 0, savederrno = 0;
 	unsigned int i;
 	struct knet_mmsghdr *cur;
 	struct knet_link *cur_link;
 
 	for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
 		sent_msgs = 0;
 		prev_sent = 0;
 		progress = 1;
 
 		cur_link = &dst_host->link[dst_host->active_links[link_idx]];
 
 		if (cur_link->transport_type == KNET_TRANSPORT_LOOPBACK) {
 			continue;
 		}
 
 		msg_idx = 0;
 		while (msg_idx < msgs_to_send) {
 			msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr;
 
 			for (i=0; i<msg[msg_idx].msg_hdr.msg_iovlen; i++) {
 				cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len;
 			}
 			cur_link->status.stats.tx_data_packets++;
 			msg_idx++;
 		}
 
 retry:
 		cur = &msg[prev_sent];
 
 		sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
 				      &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
 		savederrno = errno;
 
 		err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
 		switch(err) {
 			case -1: /* unrecoverable error */
 				cur_link->status.stats.tx_data_errors++;
 				goto out_unlock;
 				break;
 			case 0: /* ignore error and continue */
 				break;
 			case 1: /* retry to send those same data */
 				cur_link->status.stats.tx_data_retries++;
 				goto retry;
 				break;
 		}
 
 		prev_sent = prev_sent + sent_msgs;
 
 		if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) {
 			if ((sent_msgs) || (progress)) {
 				if (sent_msgs) {
 					progress = 1;
 				} else {
 					progress = 0;
 				}
 #ifdef DEBUG
 				log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
 					  sent_msgs, msg_idx,
 					  dst_host->name, dst_host->host_id,
 					  dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
 					  dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
 					  dst_host->link[dst_host->active_links[link_idx]].link_id);
 #endif
 				goto retry;
 			}
 			if (!progress) {
 				savederrno = EAGAIN;
 				err = -1;
 				goto out_unlock;
 			}
 		}
 
 		if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
 		    (dst_host->active_link_entries > 1)) {
 			uint8_t cur_link_id = dst_host->active_links[0];
 
 			memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
 			dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
 
 			break;
 		}
 	}
 
 out_unlock:
 	errno = savederrno;
 	return err;
 }
 
 static int _parse_recv_from_sock(knet_handle_t knet_h, ssize_t inlen, int8_t channel, int is_sync)
 {
 	ssize_t outlen, frag_len;
 	struct knet_host *dst_host;
 	knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST];
 	size_t dst_host_ids_entries_temp = 0;
 	knet_node_id_t dst_host_ids[KNET_MAX_HOST];
 	size_t dst_host_ids_entries = 0;
 	int bcast = 1;
 	struct knet_hostinfo *knet_hostinfo;
 	struct iovec iov_out[PCKT_FRAG_MAX][2];
 	int iovcnt_out = 2;
 	uint8_t frag_idx;
 	unsigned int temp_data_mtu;
 	size_t host_idx;
 	int send_mcast = 0;
 	struct knet_header *inbuf;
 	int savederrno = 0;
 	int err = 0;
 	seq_num_t tx_seq_num;
 	struct knet_mmsghdr msg[PCKT_FRAG_MAX];
 	int msgs_to_send, msg_idx;
 	unsigned int i;
 	int send_local = 0;
+	int data_compressed = 0;
 
 	inbuf = knet_h->recv_from_sock_buf;
 
 	if ((knet_h->enabled != 1) &&
 	    (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
 		log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
 		savederrno = ECANCELED;
 		err = -1;
 		goto out_unlock;
 	}
 
 	/*
 	 * move this into a separate function to expand on
 	 * extra switching rules
 	 */
 	switch(inbuf->kh_type) {
 		case KNET_HEADER_TYPE_DATA:
 			if (knet_h->dst_host_filter_fn) {
 				bcast = knet_h->dst_host_filter_fn(
 						knet_h->dst_host_filter_fn_private_data,
 						(const unsigned char *)inbuf->khp_data_userdata,
 						inlen,
 						KNET_NOTIFY_TX,
 						knet_h->host_id,
 						knet_h->host_id,
 						&channel,
 						dst_host_ids_temp,
 						&dst_host_ids_entries_temp);
 				if (bcast < 0) {
 					log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
 					savederrno = EFAULT;
 					err = -1;
 					goto out_unlock;
 				}
 
 				if ((!bcast) && (!dst_host_ids_entries_temp)) {
 					log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
 					savederrno = EINVAL;
 					err = -1;
 					goto out_unlock;
 				}
 			}
 
 			/* Send to localhost if appropriate and enabled */
 			if (knet_h->has_loop_link) {
 				send_local = 0;
 				if (bcast) {
 					send_local = 1;
 				} else {
 					for (i=0; i< dst_host_ids_entries_temp; i++) {
 						if (dst_host_ids_temp[i] == knet_h->host_id) {
 							send_local = 1;
 						}
 					}
 				}
 				if (send_local) {
 					const unsigned char *buf = inbuf->khp_data_userdata;
 					ssize_t buflen = inlen;
 					struct knet_link *local_link;
 
 					local_link = knet_h->host_index[knet_h->host_id]->link;
 
 				local_retry:
 					err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen);
 					if (err < 0) {
 						log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno));
 						local_link->status.stats.tx_data_errors++;
 					}
 					if (err > 0 && err < buflen) {
 						log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %ld\n", err, inlen);
 						local_link->status.stats.tx_data_retries++;
 						buf += err;
 						buflen -= err;
 						usleep(KNET_THREADS_TIMERES / 16);
 						goto local_retry;
 					}
 					if (err == buflen) {
 						local_link->status.stats.tx_data_packets++;
 						local_link->status.stats.tx_data_bytes += inlen;
 					}
 				}
 			}
 			break;
 		case KNET_HEADER_TYPE_HOST_INFO:
 			knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
 			if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
 				bcast = 0;
 				dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id;
 				dst_host_ids_entries_temp = 1;
 				knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id);
 			}
 			break;
 		default:
 			log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
 			savederrno = ENOMSG;
 			err = -1;
 			goto out_unlock;
 			break;
 	}
 
 	if (is_sync) {
 		if ((bcast) ||
 		    ((!bcast) && (dst_host_ids_entries_temp > 1))) {
 			log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
 			savederrno = E2BIG;
 			err = -1;
 			goto out_unlock;
 		}
 	}
 
 	/*
 	 * check destinations hosts before spending time
 	 * in fragmenting/encrypting packets to save
 	 * time processing data for unreachable hosts.
 	 * for unicast, also remap the destination data
 	 * to skip unreachable hosts.
 	 */
 
 	if (!bcast) {
 		dst_host_ids_entries = 0;
 		for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
 			dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
 			if (!dst_host) {
 				continue;
 			}
 			if (!(dst_host->host_id == knet_h->host_id &&
 			     knet_h->has_loop_link) &&
 			    dst_host->status.reachable) {
 				dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
 				dst_host_ids_entries++;
 			}
 		}
 		if (!dst_host_ids_entries) {
 			savederrno = EHOSTDOWN;
 			err = -1;
 			goto out_unlock;
 		}
 	} else {
 		send_mcast = 0;
 		for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
 			if (!(dst_host->host_id == knet_h->host_id &&
 			      knet_h->has_loop_link) &&
 			    dst_host->status.reachable) {
 				send_mcast = 1;
 				break;
 			}
 		}
 		if (!send_mcast) {
 			savederrno = EHOSTDOWN;
 			err = -1;
 			goto out_unlock;
 		}
 	}
 
 	if (!knet_h->data_mtu) {
 		/*
 		 * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
 		 */
 		log_debug(knet_h, KNET_SUB_TX,
 			  "Received data packet but data MTU is still unknown."
 			  " Packet might not be delivered."
 			  " Assuming mininum IPv4 mtu (%d)",
 			  KNET_PMTUD_MIN_MTU_V4);
 		temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
 	} else {
 		/*
 		 * take a copy of the mtu to avoid value changing under
 		 * our feet while we are sending a fragmented pckt
 		 */
 		temp_data_mtu = knet_h->data_mtu;
 	}
 
+	/*
+	 * compress data
+	 */
+	if (knet_h->compress_model > 0) {
+		ssize_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS;
+
+		err = compress(knet_h,
+			       (const unsigned char *)inbuf->khp_data_userdata, inlen,
+			       knet_h->send_to_links_buf_compress, &cmp_outlen);
+		if (err < 0) {
+			log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(errno));
+		} else {
+			if (cmp_outlen < inlen) {
+				memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen);
+				inlen = cmp_outlen;
+				data_compressed = 1;
+			}
+		}
+	}
+
 	/*
 	 * prepare the outgoing buffers
 	 */
 
 	frag_len = inlen;
 	frag_idx = 0;
 
 	inbuf->khp_data_bcast = bcast;
 	inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
 	inbuf->khp_data_channel = channel;
+	if (data_compressed) {
+		inbuf->khp_data_compress = knet_h->compress_model;
+	} else {
+		inbuf->khp_data_compress = 0;
+	}
 
 	if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
 		log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
 		goto out_unlock;
 	}
 	knet_h->tx_seq_num++;
 	/*
 	 * force seq_num 0 to detect a node that has crashed and rejoining
 	 * the knet instance. seq_num 0 will clear the buffers in the RX
 	 * thread
 	 */
 	if (knet_h->tx_seq_num == 0) {
 		knet_h->tx_seq_num++;
 	}
 	/*
 	 * cache the value in locked context
 	 */
 	tx_seq_num = knet_h->tx_seq_num;
 	inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num);
 	pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
 
 	/*
 	 * forcefully broadcast a ping to all nodes every SEQ_MAX / 8
 	 * pckts.
 	 * this solves 2 problems:
 	 * 1) on TX socket overloads we generate extra pings to keep links alive
 	 * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
 	 *    node 3+ will be able to keep in sync on the TX seq_num even without
 	 *    receiving traffic or pings in betweens. This avoids issues with
 	 *    rollover of the circular buffer
 	 */
 
 	if (tx_seq_num % (SEQ_MAX / 8) == 0) {
 		_send_pings(knet_h, 0);
 	}
 
 	if (inbuf->khp_data_frag_num > 1) {
 		while (frag_idx < inbuf->khp_data_frag_num) {
 			/*
 			 * set the iov_base
 			 */
 			iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
 			iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE;
 			iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx);
 
 			/*
 			 * set the len
 			 */
 			if (frag_len > temp_data_mtu) {
 				iov_out[frag_idx][1].iov_len = temp_data_mtu;
 			} else {
 				iov_out[frag_idx][1].iov_len = frag_len;
 			}
 
 			/*
 			 * copy the frag info on all buffers
 			 */
 			knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
 			knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num;
 			knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
 			knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
 			knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
+			knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress;
 
 			frag_len = frag_len - temp_data_mtu;
 			frag_idx++;
 		}
 		iovcnt_out = 2;
 	} else {
 		iov_out[frag_idx][0].iov_base = (void *)inbuf;
 		iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
 		iovcnt_out = 1;
 	}
 
 	if (knet_h->crypto_instance) {
 		frag_idx = 0;
 		while (frag_idx < inbuf->khp_data_frag_num) {
 			if (crypto_encrypt_and_signv(
 					knet_h,
 					iov_out[frag_idx], iovcnt_out,
 					knet_h->send_to_links_buf_crypt[frag_idx],
 					&outlen) < 0) {
 				log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet");
 				savederrno = ECHILD;
 				err = -1;
 				goto out_unlock;
 			}
 			iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
 			iov_out[frag_idx][0].iov_len = outlen;
 			frag_idx++;
 		}
 		iovcnt_out = 1;
 	}
 
 	memset(&msg, 0, sizeof(msg));
 
 	msgs_to_send = inbuf->khp_data_frag_num;
 
 	msg_idx = 0;
 
 	while (msg_idx < msgs_to_send) {
 		msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
 		msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0];
 		msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out;
 		msg_idx++;
 	}
 
 	if (!bcast) {
 		for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
 			dst_host = knet_h->host_index[dst_host_ids[host_idx]];
 
 			err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
 			savederrno = errno;
 			if (err) {
 				goto out_unlock;
 			}
 		}
 	} else {
 		for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
 			if (dst_host->status.reachable) {
 				err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
 				savederrno = errno;
 				if (err) {
 					goto out_unlock;
 				}
 			}
 		}
 	}
 
 out_unlock:
 	errno = savederrno;
 	return err;
 }
 
 int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
 {
 	int savederrno = 0, err = 0;
 
 	if (!knet_h) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff == NULL) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff_len <= 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (buff_len > KNET_MAX_PACKET_SIZE) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel < 0) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	if (channel >= KNET_DATAFD_MAX) {
 		errno = EINVAL;
 		return -1;
 	}
 
 	savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
 			strerror(savederrno));
 		errno = savederrno;
 		return -1;
 	}
 
 	if (!knet_h->sockfd[channel].in_use) {
 		savederrno = EINVAL;
 		err = -1;
 		goto out;
 	}
 
 	savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
 	if (savederrno) {
 		log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
 			strerror(savederrno));
 		err = -1;
 		goto out;
 	}
 
 	knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA;
 	memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len);
 	err = _parse_recv_from_sock(knet_h, buff_len, channel, 1);
 	savederrno = errno;
 
 	pthread_mutex_unlock(&knet_h->tx_mutex);
 
 out:
 	pthread_rwlock_unlock(&knet_h->global_rwlock);
 
 	errno = savederrno;
 	return err;
 }
 
 static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type)
 {
 	ssize_t inlen = 0;
 	int savederrno = 0, docallback = 0;
 
 	if ((channel >= 0) &&
 	    (channel < KNET_DATAFD_MAX) &&
 	    (!knet_h->sockfd[channel].is_socket)) {
 		inlen = readv(sockfd, msg->msg_iov, 1);
 	} else {
 		inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL);
 	}
 
 	if (inlen == 0) {
 		savederrno = 0;
 		docallback = 1;
 		goto out;
 	}
 	if (inlen < 0) {
 		savederrno = errno;
 		docallback = 1;
 		goto out;
 	}
 
 	knet_h->recv_from_sock_buf->kh_type = type;
 	_parse_recv_from_sock(knet_h, inlen, channel, 0);
 
 out:
 	if (inlen < 0) {
 		struct epoll_event ev;
 
 		memset(&ev, 0, sizeof(struct epoll_event));
 
 		if (epoll_ctl(knet_h->send_to_links_epollfd,
 			      EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
 			log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
 				knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
 		} else {
 			knet_h->sockfd[channel].has_error = 1;
 		}
 
 	}
 
 	if (docallback) {
 		knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
 				       knet_h->sockfd[channel].sockfd[0],
 				       channel,
 				       KNET_NOTIFY_TX,
 				       inlen,
 				       savederrno);
 	}
 }
 
 void *_handle_send_to_links_thread(void *data)
 {
 	knet_handle_t knet_h = (knet_handle_t) data;
 	struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
 	int i, nev, type;
 	int8_t channel;
 	struct iovec iov_in;
 	struct msghdr msg;
 	struct sockaddr_storage address;
 
 	memset(&iov_in, 0, sizeof(iov_in));
 	iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata;
 	iov_in.iov_len = KNET_MAX_PACKET_SIZE;
 
 	memset(&msg, 0, sizeof(struct msghdr));
 	msg.msg_name = &address;
 	msg.msg_namelen = sizeof(struct sockaddr_storage);
 	msg.msg_iov = &iov_in;
 	msg.msg_iovlen = 1;
 
 	knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION;
 	knet_h->recv_from_sock_buf->khp_data_frag_seq = 0;
 	knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id);
 
 	for (i = 0; i < PCKT_FRAG_MAX; i++) {
 		knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
 		knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
 		knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
 	}
 
 	while (!shutdown_in_progress(knet_h)) {
 		nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1);
 
 		if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
 			log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
 			continue;
 		}
 
 		for (i = 0; i < nev; i++) {
 			if (events[i].data.fd == knet_h->hostsockfd[0]) {
 				type = KNET_HEADER_TYPE_HOST_INFO;
 				channel = -1;
 			} else {
 				type = KNET_HEADER_TYPE_DATA;
 				for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
 					if ((knet_h->sockfd[channel].in_use) &&
 					    (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
 						break;
 					}
 				}
 			}
 			if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
 				log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
 				continue;
 			}
 			_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type);
 			pthread_mutex_unlock(&knet_h->tx_mutex);
 		}
 		pthread_rwlock_unlock(&knet_h->global_rwlock);
 	}
 
 	return NULL;
 }