Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/libknet/Makefile.am b/libknet/Makefile.am
index 55d0978d..695bb8a1 100644
--- a/libknet/Makefile.am
+++ b/libknet/Makefile.am
@@ -1,167 +1,169 @@
#
# Copyright (C) 2010-2020 Red Hat, Inc. All rights reserved.
#
# Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
# Federico Simoncelli <fsimon@kronosnet.org>
#
# This software licensed under GPL-2.0+
#
MAINTAINERCLEANFILES = Makefile.in
include $(top_srcdir)/build-aux/check.mk
SYMFILE = libknet_exported_syms
EXTRA_DIST = $(SYMFILE)
SUBDIRS = . tests
# https://www.gnu.org/software/libtool/manual/html_node/Updating-version-info.html
libversion = 2:0:0
# override global LIBS that pulls in lots of craft we don't need here
LIBS =
sources = \
common.c \
compat.c \
compress.c \
crypto.c \
handle.c \
handle_api.c \
host.c \
links.c \
links_acl.c \
links_acl_ip.c \
links_acl_loopback.c \
logging.c \
netutils.c \
onwire.c \
+ onwire_v1.c \
threads_common.c \
threads_dsthandler.c \
threads_heartbeat.c \
threads_pmtud.c \
threads_rx.c \
threads_tx.c \
transports.c \
transport_common.c \
transport_loopback.c \
transport_udp.c \
transport_sctp.c
include_HEADERS = libknet.h
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libknet.pc
noinst_HEADERS = \
common.h \
compat.h \
compress.h \
compress_model.h \
crypto.h \
crypto_model.h \
host.h \
internals.h \
links.h \
links_acl.h \
links_acl_ip.h \
links_acl_loopback.h \
logging.h \
netutils.h \
onwire.h \
+ onwire_v1.h \
threads_common.h \
threads_dsthandler.h \
threads_heartbeat.h \
threads_pmtud.h \
threads_rx.h \
threads_tx.h \
transports.h \
transport_common.h \
transport_loopback.h \
transport_udp.h \
transport_sctp.h
lib_LTLIBRARIES = libknet.la
libknet_la_SOURCES = $(sources)
AM_CFLAGS += $(libqb_CFLAGS)
libknet_la_CFLAGS = $(AM_CFLAGS) $(PTHREAD_CFLAGS)
EXTRA_libknet_la_DEPENDENCIES = $(SYMFILE)
libknet_la_LDFLAGS = $(AM_LDFLAGS) \
-Wl,--version-script=$(srcdir)/$(SYMFILE) \
-Wl,-rpath=$(pkglibdir) \
-version-info $(libversion)
libknet_la_LIBADD = $(PTHREAD_LIBS) $(dl_LIBS) $(rt_LIBS) $(m_LIBS)
# Prepare empty value for appending
pkglib_LTLIBRARIES =
# MODULE_LDFLAGS would mean a target-specific variable for Automake
MODULELDFLAGS = $(AM_LDFLAGS) -module -avoid-version -export-dynamic
if BUILD_COMPRESS_ZSTD
pkglib_LTLIBRARIES += compress_zstd.la
compress_zstd_la_LDFLAGS = $(MODULELDFLAGS)
compress_zstd_la_CFLAGS = $(AM_CFLAGS) $(libzstd_CFLAGS)
compress_zstd_la_LIBADD = $(libzstd_LIBS)
endif
if BUILD_COMPRESS_ZLIB
pkglib_LTLIBRARIES += compress_zlib.la
compress_zlib_la_LDFLAGS = $(MODULELDFLAGS)
compress_zlib_la_CFLAGS = $(AM_CFLAGS) $(zlib_CFLAGS)
compress_zlib_la_LIBADD = $(zlib_LIBS)
endif
if BUILD_COMPRESS_LZ4
pkglib_LTLIBRARIES += compress_lz4.la compress_lz4hc.la
compress_lz4_la_LDFLAGS = $(MODULELDFLAGS)
compress_lz4_la_CFLAGS = $(AM_CFLAGS) $(liblz4_CFLAGS)
compress_lz4_la_LIBADD = $(liblz4_LIBS)
compress_lz4hc_la_LDFLAGS = $(MODULELDFLAGS)
compress_lz4hc_la_CFLAGS = $(AM_CFLAGS) $(liblz4_CFLAGS)
compress_lz4hc_la_LIBADD = $(liblz4_LIBS)
endif
if BUILD_COMPRESS_LZO2
pkglib_LTLIBRARIES += compress_lzo2.la
compress_lzo2_la_LDFLAGS = $(MODULELDFLAGS)
compress_lzo2_la_CFLAGS = $(AM_CFLAGS) $(lzo2_CFLAGS)
compress_lzo2_la_LIBADD = $(lzo2_LIBS)
endif
if BUILD_COMPRESS_LZMA
pkglib_LTLIBRARIES += compress_lzma.la
compress_lzma_la_LDFLAGS = $(MODULELDFLAGS)
compress_lzma_la_CFLAGS = $(AM_CFLAGS) $(liblzma_CFLAGS)
compress_lzma_la_LIBADD = $(liblzma_LIBS)
endif
if BUILD_COMPRESS_BZIP2
pkglib_LTLIBRARIES += compress_bzip2.la
compress_bzip2_la_LDFLAGS = $(MODULELDFLAGS)
compress_bzip2_la_CFLAGS = $(AM_CFLAGS) $(bzip2_CFLAGS)
compress_bzip2_la_LIBADD = $(bzip2_LIBS)
endif
if BUILD_CRYPTO_NSS
pkglib_LTLIBRARIES += crypto_nss.la
crypto_nss_la_LDFLAGS = $(MODULELDFLAGS)
crypto_nss_la_CFLAGS = $(AM_CFLAGS) $(nss_CFLAGS)
crypto_nss_la_LIBADD = $(nss_LIBS)
endif
if BUILD_CRYPTO_OPENSSL
pkglib_LTLIBRARIES += crypto_openssl.la
crypto_openssl_la_LDFLAGS = $(MODULELDFLAGS)
crypto_openssl_la_CFLAGS = $(AM_CFLAGS) $(openssl_CFLAGS)
crypto_openssl_la_LIBADD = $(openssl_LIBS)
endif
diff --git a/libknet/onwire.h b/libknet/onwire.h
index c31f7d7d..7f0285ef 100644
--- a/libknet/onwire.h
+++ b/libknet/onwire.h
@@ -1,165 +1,156 @@
/*
* Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#ifndef __KNET_ONWIRE_H__
#define __KNET_ONWIRE_H__
#include <stdint.h>
#include "libknet.h"
/*
* data structures to define network packets.
* Start from knet_header at the bottom
*/
/*
* Plan is to support MAX_VER with MIN_VER = MAX_VER - 1
* but for the sake of not rewriting the world later on,
* let´s make sure we can support a random range of protocol
* versions
*/
#define KNET_HEADER_ONWIRE_MAX_VER 0x01 /* max onwire protocol supported by this build */
#define KNET_HEADER_ONWIRE_MIN_VER 0x01 /* min onwire protocol supported by this build */
/*
* Packet types
*/
#define KNET_HEADER_TYPE_DATA 0x00 /* pure data packet */
-/*
- * NOTE: adding packets in the PMSK requires changes to thread_rx.c
- * KNET_LINK_DYNIP code handling as thread_rx expects
- * link_id as first uint8_t in the packet structure.
- * See also above pmtud and ping/pong packets.
- */
-
-#define KNET_HEADER_TYPE_PMSK 0x80 /* packet mask */
#define KNET_HEADER_TYPE_PING 0x81 /* heartbeat */
#define KNET_HEADER_TYPE_PONG 0x82 /* reply to heartbeat */
#define KNET_HEADER_TYPE_PMTUD 0x83 /* Used to determine Path MTU */
#define KNET_HEADER_TYPE_PMTUD_REPLY 0x84 /* reply from remote host */
/*
* KNET_HEADER_TYPE_DATA
*/
typedef uint16_t seq_num_t; /* data sequence number required to deduplicate pckts */
#define SEQ_MAX UINT16_MAX
struct knet_header_payload_data {
seq_num_t khp_data_seq_num; /* pckt seq number used to deduplicate pckts */
uint8_t khp_data_compress; /* identify if user data are compressed */
uint8_t khp_data_pad1; /* make sure to have space in the header to grow features */
uint8_t khp_data_bcast; /* data destination bcast/ucast */
uint8_t khp_data_frag_num; /* number of fragments of this pckt. 1 is not fragmented */
uint8_t khp_data_frag_seq; /* as above, indicates the frag sequence number */
int8_t khp_data_channel; /* transport channel data for localsock <-> knet <-> localsock mapping */
uint8_t khp_data_userdata[0]; /* pointer to the real user data */
} __attribute__((packed));
#define khp_data_seq_num kh_payload.khp_data.khp_data_seq_num
#define khp_data_frag_num kh_payload.khp_data.khp_data_frag_num
#define khp_data_frag_seq kh_payload.khp_data.khp_data_frag_seq
#define khp_data_userdata kh_payload.khp_data.khp_data_userdata
#define khp_data_bcast kh_payload.khp_data.khp_data_bcast
#define khp_data_channel kh_payload.khp_data.khp_data_channel
#define khp_data_compress kh_payload.khp_data.khp_data_compress
/*
* KNET_HEADER_TYPE_PING / KNET_HEADER_TYPE_PONG
*/
-struct knet_header_payload_ping {
+struct knet_header_payload_ping_v1 {
uint8_t khp_ping_link; /* changing khp_ping_link requires changes to thread_rx.c
KNET_LINK_DYNIP code handling */
uint32_t khp_ping_time[4]; /* ping timestamp */
seq_num_t khp_ping_seq_num; /* transport host seq_num */
uint8_t khp_ping_timed; /* timed pinged (1) or forced by seq_num (0) */
} __attribute__((packed));
-#define khp_ping_link kh_payload.khp_ping.khp_ping_link
-#define khp_ping_time kh_payload.khp_ping.khp_ping_time
-#define khp_ping_seq_num kh_payload.khp_ping.khp_ping_seq_num
-#define khp_ping_timed kh_payload.khp_ping.khp_ping_timed
+#define khp_ping_v1_link kh_payload.khp_ping_v1.khp_ping_link
+#define khp_ping_v1_time kh_payload.khp_ping_v1.khp_ping_time
+#define khp_ping_v1_seq_num kh_payload.khp_ping_v1.khp_ping_seq_num
+#define khp_ping_v1_timed kh_payload.khp_ping_v1.khp_ping_timed
/*
* KNET_HEADER_TYPE_PMTUD / KNET_HEADER_TYPE_PMTUD_REPLY
*/
/*
* taken from tracepath6
*/
#define KNET_PMTUD_SIZE_V4 65535
#define KNET_PMTUD_SIZE_V6 KNET_PMTUD_SIZE_V4
/*
* IPv4/IPv6 header size
*/
#define KNET_PMTUD_OVERHEAD_V4 20
#define KNET_PMTUD_OVERHEAD_V6 40
#define KNET_PMTUD_MIN_MTU_V4 576
#define KNET_PMTUD_MIN_MTU_V6 1280
struct knet_header_payload_pmtud {
- uint8_t khp_pmtud_link; /* changing khp_pmtud_link requires changes to thread_rx.c
- KNET_LINK_DYNIP code handling */
+ uint8_t khp_pmtud_link; /* link_id */
uint16_t khp_pmtud_size; /* size of the current packet */
uint8_t khp_pmtud_data[0]; /* pointer to empty/random data/fill buffer */
} __attribute__((packed));
#define khp_pmtud_link kh_payload.khp_pmtud.khp_pmtud_link
#define khp_pmtud_size kh_payload.khp_pmtud.khp_pmtud_size
#define khp_pmtud_data kh_payload.khp_pmtud.khp_pmtud_data
/*
* PMTUd related functions
*/
size_t calc_data_outlen(knet_handle_t knet_h, size_t inlen);
size_t calc_max_data_outlen(knet_handle_t knet_h, size_t inlen);
size_t calc_min_mtu(knet_handle_t knet_h);
/*
* union to reference possible individual payloads
*/
union knet_header_payload {
- struct knet_header_payload_data khp_data; /* pure data packet struct */
- struct knet_header_payload_ping khp_ping; /* heartbeat packet struct */
- struct knet_header_payload_pmtud khp_pmtud; /* Path MTU discovery packet struct */
+ struct knet_header_payload_data khp_data; /* pure data packet struct */
+ struct knet_header_payload_ping_v1 khp_ping_v1; /* heartbeat packet struct */
+ struct knet_header_payload_pmtud khp_pmtud; /* Path MTU discovery packet struct */
} __attribute__((packed));
/*
* this header CANNOT change or onwire compat will break!
*/
struct knet_header {
uint8_t kh_version; /* this pckt format/version */
uint8_t kh_type; /* from above defines. Tells what kind of pckt it is */
knet_node_id_t kh_node; /* host id of the source host for this pckt */
uint8_t kh_max_ver; /* max version of the protocol supported by this node */
uint8_t kh_pad1; /* make sure to have space in the header to grow features */
union knet_header_payload kh_payload; /* union of potential data struct based on kh_type */
} __attribute__((packed));
/*
* extra defines to avoid mingling with sizeof() too much
*/
#define KNET_HEADER_ALL_SIZE sizeof(struct knet_header)
#define KNET_HEADER_SIZE (KNET_HEADER_ALL_SIZE - sizeof(union knet_header_payload))
-#define KNET_HEADER_PING_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_ping))
+#define KNET_HEADER_PING_V1_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_ping_v1))
#define KNET_HEADER_PMTUD_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_pmtud))
#define KNET_HEADER_DATA_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_data))
#endif
diff --git a/libknet/onwire_v1.c b/libknet/onwire_v1.c
new file mode 100644
index 00000000..d077ce67
--- /dev/null
+++ b/libknet/onwire_v1.c
@@ -0,0 +1,97 @@
+/*
+ * Copyright (C) 2020 Red Hat, Inc. All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ *
+ * This software licensed under LGPL-2.0+
+ */
+
+#include "config.h"
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <pthread.h>
+#include <time.h>
+
+#include "logging.h"
+#include "host.h"
+#include "links.h"
+
+int prep_ping_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, struct timespec clock_now, int timed, ssize_t *outlen)
+{
+ *outlen = KNET_HEADER_PING_V1_SIZE;
+
+ /* preparing ping buffer */
+ knet_h->pingbuf->kh_version = onwire_ver;
+ knet_h->pingbuf->kh_max_ver = KNET_HEADER_ONWIRE_MAX_VER;
+ knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING;
+ knet_h->pingbuf->kh_node = htons(knet_h->host_id);
+ knet_h->pingbuf->khp_ping_v1_link = dst_link->link_id;
+ knet_h->pingbuf->khp_ping_v1_timed = timed;
+ memmove(&knet_h->pingbuf->khp_ping_v1_time[0], &clock_now, sizeof(struct timespec));
+
+ if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
+ log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get seq mutex lock");
+ return -1;
+ }
+ knet_h->pingbuf->khp_ping_v1_seq_num = htons(knet_h->tx_seq_num);
+ pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
+
+ return 0;
+}
+
+void prep_pong_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen)
+{
+ *outlen = KNET_HEADER_PING_V1_SIZE;
+ inbuf->kh_type = KNET_HEADER_TYPE_PONG;
+ inbuf->kh_node = htons(knet_h->host_id);
+}
+
+void process_ping_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len)
+{
+ int wipe_bufs = 0;
+ seq_num_t recv_seq_num = ntohs(inbuf->khp_ping_v1_seq_num);
+
+ if (!inbuf->khp_ping_v1_timed) {
+ /*
+ * we might be receiving this message from all links, but we want
+ * to process it only the first time
+ */
+ if (recv_seq_num != src_host->untimed_rx_seq_num) {
+ /*
+ * cache the untimed seq num
+ */
+ src_host->untimed_rx_seq_num = recv_seq_num;
+ /*
+ * if the host has received data in between
+ * untimed ping, then we don't need to wipe the bufs
+ */
+ if (src_host->got_data) {
+ src_host->got_data = 0;
+ wipe_bufs = 0;
+ } else {
+ wipe_bufs = 1;
+ }
+ }
+ _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
+ } else {
+ /*
+ * pings always arrives in bursts over all the link
+ * catch the first of them to cache the seq num and
+ * avoid duplicate processing
+ */
+ if (recv_seq_num != src_host->timed_rx_seq_num) {
+ src_host->timed_rx_seq_num = recv_seq_num;
+
+ if (recv_seq_num == 0) {
+ _seq_num_lookup(src_host, recv_seq_num, 0, 1);
+ }
+ }
+ }
+}
+
+void process_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, struct timespec *recvtime)
+{
+ memmove(recvtime, &inbuf->khp_ping_v1_time[0], sizeof(struct timespec));
+}
diff --git a/libknet/onwire_v1.h b/libknet/onwire_v1.h
new file mode 100644
index 00000000..14531c13
--- /dev/null
+++ b/libknet/onwire_v1.h
@@ -0,0 +1,21 @@
+/*
+ * Copyright (C) 2020 Red Hat, Inc. All rights reserved.
+ *
+ * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
+ *
+ * This software licensed under LGPL-2.0+
+ */
+
+#ifndef __KNET_ONWIRE_V1_H__
+#define __KNET_ONWIRE_V1_H__
+
+#include <stdint.h>
+
+#include "internals.h"
+
+int prep_ping_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, struct timespec clock_now, int timed, ssize_t *outlen);
+void prep_pong_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen);
+void process_ping_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len);
+void process_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, struct timespec *recvtime);
+
+#endif
diff --git a/libknet/tests/pckt_test.c b/libknet/tests/pckt_test.c
index 30798f3c..2e8b04f6 100644
--- a/libknet/tests/pckt_test.c
+++ b/libknet/tests/pckt_test.c
@@ -1,23 +1,23 @@
/*
* Copyright (C) 2015-2020 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+
*/
#include <stdio.h>
#include "onwire.h"
int main(void)
{
printf("\nKronosnet network header size printout:\n\n");
printf("KNET_HEADER_ALL_SIZE: %zu\n", KNET_HEADER_ALL_SIZE);
printf("KNET_HEADER_SIZE: %zu\n", KNET_HEADER_SIZE);
- printf("KNET_HEADER_PING_SIZE: %zu (%zu)\n", KNET_HEADER_PING_SIZE, sizeof(struct knet_header_payload_ping));
+ printf("KNET_HEADER_PING_V1_SIZE: %zu (%zu)\n", KNET_HEADER_PING_V1_SIZE, sizeof(struct knet_header_payload_ping_v1));
printf("KNET_HEADER_PMTUD_SIZE: %zu (%zu)\n", KNET_HEADER_PMTUD_SIZE, sizeof(struct knet_header_payload_pmtud));
printf("KNET_HEADER_DATA_SIZE: %zu (%zu)\n", KNET_HEADER_DATA_SIZE, sizeof(struct knet_header_payload_data));
return 0;
}
diff --git a/libknet/threads_heartbeat.c b/libknet/threads_heartbeat.c
index 23279f7f..1cfb0a3f 100644
--- a/libknet/threads_heartbeat.c
+++ b/libknet/threads_heartbeat.c
@@ -1,404 +1,394 @@
/*
* Copyright (C) 2015-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include "crypto.h"
#include "host.h"
#include "links.h"
#include "logging.h"
#include "transports.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
+#include "onwire_v1.h"
static void _link_down(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link)
{
memset(&dst_link->pmtud_last, 0, sizeof(struct timespec));
dst_link->received_pong = 0;
dst_link->status.pong_last.tv_nsec = 0;
dst_link->pong_timeout_backoff = KNET_LINK_PONG_TIMEOUT_BACKOFF;
if (dst_link->status.connected == 1) {
log_info(knet_h, KNET_SUB_LINK, "host: %u link: %u is down",
dst_host->host_id, dst_link->link_id);
_link_updown(knet_h, dst_host->host_id, dst_link->link_id, dst_link->status.enabled, 0, 1);
}
}
static void send_ping(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int timed)
{
int err = 0, savederrno = 0, stats_err = 0;
int len;
- ssize_t outlen = KNET_HEADER_PING_SIZE;
+ ssize_t outlen;
struct timespec clock_now, pong_last;
unsigned long long diff_ping;
unsigned char *outbuf = (unsigned char *)knet_h->pingbuf;
+ uint8_t onwire_ver;
if (dst_link->transport_connected == 0) {
_link_down(knet_h, dst_host, dst_link);
return;
}
/* caching last pong to avoid race conditions */
pong_last = dst_link->status.pong_last;
if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get monotonic clock");
return;
}
timespec_diff(dst_link->ping_last, clock_now, &diff_ping);
if ((diff_ping >= (dst_link->ping_interval * 1000llu)) || (!timed)) {
- /* preparing ping buffer */
- knet_h->pingbuf->kh_version = knet_h->onwire_ver;
- knet_h->pingbuf->kh_max_ver = KNET_HEADER_ONWIRE_MAX_VER;
- knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING;
- knet_h->pingbuf->kh_node = htons(knet_h->host_id);
-
- memmove(&knet_h->pingbuf->khp_ping_time[0], &clock_now, sizeof(struct timespec));
- knet_h->pingbuf->khp_ping_link = dst_link->link_id;
- if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
- log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get seq mutex lock");
+ if (pthread_mutex_lock(&knet_h->onwire_mutex)) {
+ log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get onwire mutex lock");
return;
}
- knet_h->pingbuf->khp_ping_seq_num = htons(knet_h->tx_seq_num);
- pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
- knet_h->pingbuf->khp_ping_timed = timed;
+ onwire_ver = knet_h->onwire_ver;
+ pthread_mutex_unlock(&knet_h->onwire_mutex);
+
+ switch (onwire_ver) {
+ case 1:
+ if (prep_ping_v1(knet_h, dst_link, onwire_ver, clock_now, timed, &outlen) < 0) {
+ return;
+ }
+ break;
+ default:
+ log_warn(knet_h, KNET_SUB_HEARTBEAT, "preparing ping onwire version %u not supported", onwire_ver);
+ return;
+ }
if (knet_h->crypto_in_use_config) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)knet_h->pingbuf,
outlen,
knet_h->pingbuf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to crypto ping packet");
return;
}
outbuf = knet_h->pingbuf_crypt;
if (pthread_mutex_lock(&knet_h->handle_stats_mutex) < 0) {
log_err(knet_h, KNET_SUB_HEARTBEAT, "Unable to get mutex lock");
return;
}
knet_h->stats_extra.tx_crypt_ping_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
stats_err = pthread_mutex_lock(&dst_link->link_stats_mutex);
if (stats_err) {
log_err(knet_h, KNET_SUB_HEARTBEAT, "Unable to get stats mutex lock for host %u link %u: %s",
dst_host->host_id, dst_link->link_id, strerror(stats_err));
return;
}
retry:
if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(dst_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &dst_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(dst_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
dst_link->ping_last = clock_now;
dst_link->status.stats.tx_ping_packets++;
dst_link->status.stats.tx_ping_bytes += outlen;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, dst_link->transport, dst_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_HEARTBEAT,
"Unable to send ping (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
dst_link->outsock, savederrno, strerror(savederrno),
dst_link->status.src_ipaddr, dst_link->status.src_port,
dst_link->status.dst_ipaddr, dst_link->status.dst_port);
dst_link->status.stats.tx_ping_errors++;
break;
case 0:
break;
case 1:
dst_link->status.stats.tx_ping_retries++;
goto retry;
break;
}
} else {
dst_link->last_ping_size = outlen;
}
pthread_mutex_unlock(&dst_link->link_stats_mutex);
}
timespec_diff(pong_last, clock_now, &diff_ping);
if ((pong_last.tv_nsec) &&
(diff_ping >= (dst_link->pong_timeout_adj * 1000llu))) {
_link_down(knet_h, dst_host, dst_link);
}
}
-static void send_pong(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf) {
+static void send_pong(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf)
+{
int err = 0, savederrno = 0, stats_err = 0;
unsigned char *outbuf = (unsigned char *)inbuf;
ssize_t len, outlen;
- outlen = KNET_HEADER_PING_SIZE;
- inbuf->kh_type = KNET_HEADER_TYPE_PONG;
- inbuf->kh_node = htons(knet_h->host_id);
+ switch (inbuf->kh_version) {
+ case 1:
+ prep_pong_v1(knet_h, inbuf, &outlen);
+ break;
+ default:
+ log_warn(knet_h, KNET_SUB_HEARTBEAT, "preparing pong onwire version %u not supported", inbuf->kh_version);
+ return;
+ }
if (knet_h->crypto_in_use_config) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
outlen,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to encrypt pong packet");
return;
}
outbuf = knet_h->recv_from_links_buf_crypt;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_HEARTBEAT, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
knet_h->stats_extra.tx_crypt_pong_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
retry:
if (src_link->transport_connected) {
if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_HEARTBEAT,
"Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pong_errors++;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pong_retries++;
goto retry;
break;
}
}
src_link->status.stats.tx_pong_packets++;
src_link->status.stats.tx_pong_bytes += outlen;
}
}
-void process_ping(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len) {
- int wipe_bufs = 0;
- seq_num_t recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
-
+void process_ping(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len)
+{
src_link->status.stats.rx_ping_packets++;
src_link->status.stats.rx_ping_bytes += len;
- if (!inbuf->khp_ping_timed) {
- /*
- * we might be receiving this message from all links, but we want
- * to process it only the first time
- */
- if (recv_seq_num != src_host->untimed_rx_seq_num) {
- /*
- * cache the untimed seq num
- */
- src_host->untimed_rx_seq_num = recv_seq_num;
- /*
- * if the host has received data in between
- * untimed ping, then we don't need to wipe the bufs
- */
- if (src_host->got_data) {
- src_host->got_data = 0;
- wipe_bufs = 0;
- } else {
- wipe_bufs = 1;
- }
- }
- _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
- } else {
- /*
- * pings always arrives in bursts over all the link
- * catch the first of them to cache the seq num and
- * avoid duplicate processing
- */
- if (recv_seq_num != src_host->timed_rx_seq_num) {
- src_host->timed_rx_seq_num = recv_seq_num;
-
- if (recv_seq_num == 0) {
- _seq_num_lookup(src_host, recv_seq_num, 0, 1);
- }
- }
+ switch (inbuf->kh_version) {
+ case 1:
+ process_ping_v1(knet_h, src_host, src_link, inbuf, len);
+ break;
+ default:
+ log_warn(knet_h, KNET_SUB_HEARTBEAT, "parsing ping onwire version %u not supported", inbuf->kh_version);
+ return;
}
send_pong(knet_h, src_host, src_link, inbuf);
}
-void process_pong(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len) {
+void process_pong(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len)
+{
struct timespec recvtime;
unsigned long long latency_last;
+ clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
+
src_link->status.stats.rx_pong_packets++;
src_link->status.stats.rx_pong_bytes += len;
- clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
- memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
+ switch (inbuf->kh_version) {
+ case 1:
+ process_pong_v1(knet_h, src_host, src_link, inbuf, &recvtime);
+ break;
+ default:
+ log_warn(knet_h, KNET_SUB_HEARTBEAT, "parsing pong onwire version %u not supported", inbuf->kh_version);
+ return;
+ }
+
timespec_diff(recvtime,
src_link->status.pong_last, &latency_last);
if ((latency_last / 1000llu) > src_link->pong_timeout) {
- log_debug(knet_h, KNET_SUB_RX,
+ log_debug(knet_h, KNET_SUB_HEARTBEAT,
"Incoming pong packet from host: %u link: %u has higher latency than pong_timeout. Discarding",
src_host->host_id, src_link->link_id);
} else {
/*
* in words : ('previous mean' * '(count -1)') + 'new value') / 'count'
*/
src_link->status.stats.latency_samples++;
/*
* limit to max_samples (precision)
*/
if (src_link->status.stats.latency_samples >= src_link->latency_max_samples) {
src_link->status.stats.latency_samples = src_link->latency_max_samples;
}
src_link->status.stats.latency_ave =
(((src_link->status.stats.latency_ave * (src_link->status.stats.latency_samples - 1)) + (latency_last / 1000llu)) / src_link->status.stats.latency_samples);
if (src_link->status.stats.latency_ave < src_link->pong_timeout_adj) {
if (!src_link->status.connected) {
if (src_link->received_pong >= src_link->pong_count) {
- log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
+ log_info(knet_h, KNET_SUB_HEARTBEAT, "host: %u link: %u is up",
src_host->host_id, src_link->link_id);
_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0);
} else {
src_link->received_pong++;
- log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
+ log_debug(knet_h, KNET_SUB_HEARTBEAT, "host: %u link: %u received pong: %u",
src_host->host_id, src_link->link_id, src_link->received_pong);
}
}
}
/* Calculate latency stats */
if (src_link->status.stats.latency_ave > src_link->status.stats.latency_max) {
src_link->status.stats.latency_max = src_link->status.stats.latency_ave;
}
if (src_link->status.stats.latency_ave < src_link->status.stats.latency_min) {
src_link->status.stats.latency_min = src_link->status.stats.latency_ave;
}
}
}
void _send_pings(knet_handle_t knet_h, int timed)
{
struct knet_host *dst_host;
int link_idx;
if (pthread_mutex_lock(&knet_h->hb_mutex)) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get hb mutex lock");
return;
}
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if ((dst_host->link[link_idx].status.enabled != 1) ||
(dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK ) ||
((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
(dst_host->link[link_idx].status.dynconnected != 1)))
continue;
send_ping(knet_h, dst_host, &dst_host->link[link_idx], timed);
}
}
pthread_mutex_unlock(&knet_h->hb_mutex);
}
static void _adjust_pong_timeouts(knet_handle_t knet_h)
{
struct knet_host *dst_host;
struct knet_link *dst_link;
int link_idx;
if (pthread_mutex_lock(&knet_h->backoff_mutex)) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get backoff_mutex");
return;
}
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if ((dst_host->link[link_idx].status.enabled != 1) ||
(dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK ) ||
((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
(dst_host->link[link_idx].status.dynconnected != 1)))
continue;
dst_link = &dst_host->link[link_idx];
if (dst_link->pong_timeout_backoff > 1) {
dst_link->pong_timeout_backoff--;
}
dst_link->pong_timeout_adj = (dst_link->pong_timeout * dst_link->pong_timeout_backoff) + (dst_link->status.stats.latency_ave * KNET_LINK_PONG_TIMEOUT_LAT_MUL);
}
}
pthread_mutex_unlock(&knet_h->backoff_mutex);
}
void *_handle_heartbt_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
int i = 1;
set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_STARTED);
while (!shutdown_in_progress(knet_h)) {
usleep(knet_h->threads_timer_res);
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get read lock");
continue;
}
/*
* _adjust_pong_timeouts should execute approx once a second.
*/
if ((i % (1000000 / knet_h->threads_timer_res)) == 0) {
_adjust_pong_timeouts(knet_h);
i = 1;
} else {
i++;
}
_send_pings(knet_h, 1);
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_STOPPED);
return NULL;
}
diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
index 25927897..6655c60b 100644
--- a/libknet/threads_rx.c
+++ b/libknet/threads_rx.c
@@ -1,818 +1,825 @@
/*
* Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/uio.h>
#include <pthread.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "links.h"
#include "links_acl.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_pmtud.h"
#include "threads_rx.h"
#include "netutils.h"
/*
* RECV
*/
/*
* return 1 if a > b
* return -1 if b > a
* return 0 if they are equal
*/
static inline int timecmp(struct timespec a, struct timespec b)
{
if (a.tv_sec != b.tv_sec) {
if (a.tv_sec > b.tv_sec) {
return 1;
} else {
return -1;
}
} else {
if (a.tv_nsec > b.tv_nsec) {
return 1;
} else if (a.tv_nsec < b.tv_nsec) {
return -1;
} else {
return 0;
}
}
}
/*
* this functions needs to return an index (0 to 7)
* to a knet_host_defrag_buf. (-1 on errors)
*/
static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
{
struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
int i, oldest;
/*
* check if there is a buffer already in use handling the same seq_num
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (src_host->defrag_buf[i].in_use) {
if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
return i;
}
}
}
/*
* If there is no buffer that's handling the current seq_num
* either it's new or it's been reclaimed already.
* check if it's been reclaimed/seen before using the defrag circular
* buffer. If the pckt has been seen before, the buffer expired (ETIME)
* and there is no point to try to defrag it again.
*/
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
errno = ETIME;
return -1;
}
/*
* register the pckt as seen
*/
_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
/*
* see if there is a free buffer
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (!src_host->defrag_buf[i].in_use) {
return i;
}
}
/*
* at this point, there are no free buffers, the pckt is new
* and we need to reclaim a buffer, and we will take the one
* with the oldest timestamp. It's as good as any.
*/
oldest = 0;
for (i = 0; i < KNET_MAX_LINK; i++) {
if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
oldest = i;
}
}
src_host->defrag_buf[oldest].in_use = 0;
return oldest;
}
static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
{
struct knet_host_defrag_buf *defrag_buf;
int defrag_buf_idx;
defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
if (defrag_buf_idx < 0) {
return 1;
}
defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
/*
* if the buf is not is use, then make sure it's clean
*/
if (!defrag_buf->in_use) {
memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
defrag_buf->in_use = 1;
defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
}
/*
* update timestamp on the buffer
*/
clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
/*
* check if we already received this fragment
*/
if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
/*
* if we have received this fragment and we didn't clear the buffer
* it means that we don't have all fragments yet
*/
return 1;
}
/*
* we need to handle the last packet with gloves due to its different size
*/
if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
defrag_buf->last_frag_size = *len;
/*
* in the event when the last packet arrives first,
* we still don't know the offset vs the other fragments (based on MTU),
* so we store the fragment at the end of the buffer where it's safe
* and take a copy of the len so that we can restore its offset later.
* remember we can't use the local MTU for this calculation because pMTU
* can be asymettric between the same hosts.
*/
if (!defrag_buf->frag_size) {
defrag_buf->last_first = 1;
memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
inbuf->khp_data_userdata,
*len);
}
} else {
defrag_buf->frag_size = *len;
}
if (defrag_buf->frag_size) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
inbuf->khp_data_userdata, *len);
}
defrag_buf->frag_recv++;
defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
/*
* check if we received all the fragments
*/
if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
/*
* special case the last pckt
*/
if (defrag_buf->last_first) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
defrag_buf->last_frag_size);
}
/*
* recalculate packet lenght
*/
*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
/*
* copy the pckt back in the user data
*/
memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
/*
* free this buffer
*/
defrag_buf->in_use = 0;
return 0;
}
return 1;
}
static void process_data(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len, uint64_t decrypt_time)
{
int err = 0, stats_err = 0;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
struct iovec iov_out[1];
int8_t channel;
ssize_t outlen;
/* data stats at the top for consistency with TX */
src_link->status.stats.rx_data_packets++;
src_link->status.stats.rx_data_bytes += len;
if (decrypt_time) {
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
/* Only update the crypto overhead for data packets. Mainly to be
consistent with TX */
if (decrypt_time < knet_h->stats.rx_crypt_time_min) {
knet_h->stats.rx_crypt_time_min = decrypt_time;
}
if (decrypt_time > knet_h->stats.rx_crypt_time_max) {
knet_h->stats.rx_crypt_time_max = decrypt_time;
}
knet_h->stats.rx_crypt_time_ave =
(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
decrypt_time) / (knet_h->stats.rx_crypt_packets+1);
knet_h->stats.rx_crypt_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
channel = inbuf->khp_data_channel;
src_host->got_data = 1;
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
}
return;
}
if (inbuf->khp_data_frag_num > 1) {
/*
* len as received from the socket also includes extra stuff
* that the defrag code doesn't care about. So strip it
* here and readd only for repadding once we are done
* defragging
*/
len = len - KNET_HEADER_DATA_SIZE;
if (pckt_defrag(knet_h, inbuf, &len)) {
return;
}
len = len + KNET_HEADER_DATA_SIZE;
}
if (inbuf->khp_data_compress) {
ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = decompress(knet_h, inbuf->khp_data_compress,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
knet_h->recv_from_links_buf_decompress,
&decmp_outlen);
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (!err) {
/* Collect stats */
if (compress_time < knet_h->stats.rx_compress_time_min) {
knet_h->stats.rx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.rx_compress_time_max) {
knet_h->stats.rx_compress_time_max = compress_time;
}
knet_h->stats.rx_compress_time_ave =
(knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets +
compress_time) / (knet_h->stats.rx_compressed_packets+1);
knet_h->stats.rx_compressed_packets++;
knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE;
memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
len = decmp_outlen + KNET_HEADER_DATA_SIZE;
} else {
knet_h->stats.rx_failed_to_decompress++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
err, strerror(errno));
return;
}
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
if (!src_host->status.reachable) {
log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id);
return;
}
if (knet_h->enabled != 1) /* data forward is disabled */
return;
if (knet_h->dst_host_filter_fn) {
size_t host_idx;
int found = 0;
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
&channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
return;
}
if ((!bcast) && (!dst_host_ids_entries)) {
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return;
}
/* check if we are dst for this packet */
if (!bcast) {
if (dst_host_ids_entries > KNET_MAX_HOST) {
log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
return;
}
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
if (!found) {
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return;
}
}
}
if (!knet_h->sockfd[channel].in_use) {
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
outlen = 0;
memset(iov_out, 0, sizeof(iov_out));
retry:
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
log_debug(knet_h, KNET_SUB_RX,
"Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
iov_out[0].iov_len, outlen);
goto retry;
}
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
return;
}
if ((size_t)outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
}
}
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
{
int savederrno = 0, stats_err = 0;
ssize_t outlen;
struct knet_host *src_host;
struct knet_link *src_link;
uint64_t decrypt_time = 0;
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
int try_decrypt = 0, i, found_link = 0;
for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
if (knet_h->crypto_instance[i]) {
try_decrypt = 1;
break;
}
}
if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) {
log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!");
return;
}
if (try_decrypt) {
struct timespec start_time;
struct timespec end_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_authenticate_and_decrypt(knet_h,
(unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_decrypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) {
return;
}
log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data");
} else {
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &decrypt_time);
len = outlen;
inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
}
}
if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
return;
}
if ((inbuf->kh_version > KNET_HEADER_ONWIRE_MAX_VER) &&
(inbuf->kh_version < KNET_HEADER_ONWIRE_MIN_VER)) {
if (KNET_HEADER_ONWIRE_MAX_VER > 1 ) {
log_debug(knet_h, KNET_SUB_RX,
"Received packet version %u. current node only supports onwire version from %u to %u",
inbuf->kh_version, KNET_HEADER_ONWIRE_MIN_VER, KNET_HEADER_ONWIRE_MAX_VER);
} else {
log_debug(knet_h, KNET_SUB_RX,
"Received packet version %u. current node only supports %u",
inbuf->kh_version, KNET_HEADER_ONWIRE_MAX_VER);
}
return;
}
inbuf->kh_node = ntohs(inbuf->kh_node);
src_host = knet_h->host_index[inbuf->kh_node];
if (src_host == NULL) { /* host not found */
log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
return;
}
- if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
- /* be aware this works only for PING / PONG and PMTUd packets! */
- src_link = &src_host->link[inbuf->khp_ping_link];
+ if (inbuf->kh_type == KNET_HEADER_TYPE_PING) {
+ switch (inbuf->kh_version) {
+ case 1:
+ src_link = &src_host->link[inbuf->khp_ping_v1_link];
+ break;
+ default:
+ log_warn(knet_h, KNET_SUB_RX, "Parsing ping onwire version %u not supported", inbuf->kh_version);
+ return;
+ }
+
if (src_link->dynamic == KNET_LINK_DYNIP) {
if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) {
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
src_host->host_id, src_link->link_id);
memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage));
if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
} else {
log_info(knet_h, KNET_SUB_RX,
"host: %u link: %u new connection established from: %s %s",
src_host->host_id, src_link->link_id,
src_link->status.dst_ipaddr, src_link->status.dst_port);
}
}
/*
* transport has already accepted the connection here
* otherwise we would not be receiving packets
*/
transport_link_dyn_connect(knet_h, sockfd, src_link);
}
} else { /* data packet */
for (i = 0; i < KNET_MAX_LINK; i++) {
src_link = &src_host->link[i];
if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) {
found_link = 1;
break;
}
}
if (!found_link) {
log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet.");
return;
}
}
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
if (stats_err) {
log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s",
src_host->host_id, src_link->link_id, strerror(savederrno));
return;
}
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_DATA:
process_data(knet_h, src_host, src_link, inbuf, len, decrypt_time);
break;
case KNET_HEADER_TYPE_PING:
process_ping(knet_h, src_host, src_link, inbuf, len);
break;
case KNET_HEADER_TYPE_PONG:
process_pong(knet_h, src_host, src_link, inbuf, len);
break;
case KNET_HEADER_TYPE_PMTUD:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
/* Unlock so we don't deadlock with tx_mutex */
pthread_mutex_unlock(&src_link->link_stats_mutex);
process_pmtud(knet_h, src_link, inbuf);
return; /* Don't need to unlock link_stats_mutex */
case KNET_HEADER_TYPE_PMTUD_REPLY:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
/* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
pthread_mutex_unlock(&src_link->link_stats_mutex);
process_pmtud_reply(knet_h, src_link, inbuf);
return;
default:
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int err, savederrno;
int i, msg_recv, transport;
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
/*
* this is normal if a fd got an event and before we grab the read lock
* and the link is removed by another thread
*/
goto exit_unlock;
}
transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
/*
* reset msg_namelen to buffer size because after recvmmsg
* each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
*/
for (i = 0; i < PCKT_RX_BUFS; i++) {
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
}
msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
/*
* WARNING: man page for recvmmsg is wrong. Kernel implementation here:
* recvmmsg can return:
* -1 on error
* 0 if the previous run of recvmmsg recorded an error on the socket
* N number of messages (see exception below).
*
* If there is an error from recvmsg after receiving a frame or more, the recvmmsg
* loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
* it will be visibile in the next run.
*
* Need to be careful how we handle errors at this stage.
*
* error messages need to be handled on a per transport/protocol base
* at this point we have different layers of error handling
* - msg_recv < 0 -> error from this run
* msg_recv = 0 -> error from previous run and error on socket needs to be cleared
* - per-transport message data
* example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
* but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
* - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
* errno set. That means the error api needs to be able to abort the loop below.
*/
if (msg_recv <= 0) {
transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno);
goto exit_unlock;
}
for (i = 0; i < msg_recv; i++) {
err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]);
/*
* TODO: make this section silent once we are confident
* all protocols packet handlers are good
*/
switch(err) {
case KNET_TRANSPORT_RX_ERROR: /* on error */
log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
break;
case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */
/*
* processing incoming packets vs access lists
*/
if ((knet_h->use_access_lists) &&
(transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) {
if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) {
char src_ipaddr[KNET_MAX_HOST_LEN];
char src_port[KNET_MAX_PORT_LEN];
memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
memset(src_port, 0, KNET_MAX_PORT_LEN);
if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name),
src_ipaddr, KNET_MAX_HOST_LEN,
src_port, KNET_MAX_PORT_LEN) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port");
} else {
log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port);
}
/*
* continue processing the other packets
*/
continue;
}
}
_parse_recv_from_links(knet_h, sockfd, &msg[i]);
break;
case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE:
log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue");
break;
case KNET_TRANSPORT_RX_OOB_DATA_STOP:
log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop");
goto exit_unlock;
break;
}
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_RX_BUFS];
struct knet_mmsghdr msg[PCKT_RX_BUFS];
struct iovec iov_in[PCKT_RX_BUFS];
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED);
memset(&msg, 0, sizeof(msg));
memset(&events, 0, sizeof(events));
for (i = 0; i < PCKT_RX_BUFS; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
iov_in[i].iov_len = KNET_DATABUFSIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000);
/*
* the RX threads only need to notify that there has been at least
* one successful run after queue flush has been requested.
* See setfwd in handle.c
*/
if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
}
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
}
}
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED);
return NULL;
}
ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_in;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)buff;
iov_in.iov_len = buff_len;
err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 10, 12:41 AM (5 h, 51 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2009344
Default Alt Text
(56 KB)

Event Timeline