diff --git a/libknet/onwire.h b/libknet/onwire.h index f9fb218d..2249bfe9 100644 --- a/libknet/onwire.h +++ b/libknet/onwire.h @@ -1,156 +1,166 @@ /* * Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_ONWIRE_H__ #define __KNET_ONWIRE_H__ #include #include "libknet.h" /* * data structures to define network packets. * Start from knet_header at the bottom */ /* * Plan is to support MAX_VER with MIN_VER = MAX_VER - 1 * but for the sake of not rewriting the world later on, * let´s make sure we can support a random range of protocol * versions */ #define KNET_HEADER_ONWIRE_MAX_VER 0x01 /* max onwire protocol supported by this build */ #define KNET_HEADER_ONWIRE_MIN_VER 0x01 /* min onwire protocol supported by this build */ /* * Packet types + * + * adding new DATA types requires the packet to contain + * data_seq_num and frag_num/frag_seq in the current data types. + * + * Changing those data types requires major surgery to thread_tx/thread_rx + * and defrag buffer allocation in knet_host_add. + * + * Also please be aware that frags buffer allocation size is not constant + * so you cannot assume each frag is 64K+. + * (see handle.c) */ #define KNET_HEADER_TYPE_DATA 0x00 /* pure data packet */ #define KNET_HEADER_TYPE_PING 0x81 /* heartbeat */ #define KNET_HEADER_TYPE_PONG 0x82 /* reply to heartbeat */ #define KNET_HEADER_TYPE_PMTUD 0x83 /* Used to determine Path MTU */ #define KNET_HEADER_TYPE_PMTUD_REPLY 0x84 /* reply from remote host */ /* * KNET_HEADER_TYPE_DATA */ typedef uint16_t seq_num_t; /* data sequence number required to deduplicate pckts */ #define SEQ_MAX UINT16_MAX -struct knet_header_payload_data { +struct knet_header_payload_data_v1 { seq_num_t khp_data_seq_num; /* pckt seq number used to deduplicate pckts */ uint8_t khp_data_compress; /* identify if user data are compressed */ uint8_t khp_data_pad1; /* make sure to have space in the header to grow features */ uint8_t khp_data_bcast; /* data destination bcast/ucast */ uint8_t khp_data_frag_num; /* number of fragments of this pckt. 1 is not fragmented */ uint8_t khp_data_frag_seq; /* as above, indicates the frag sequence number */ int8_t khp_data_channel; /* transport channel data for localsock <-> knet <-> localsock mapping */ uint8_t khp_data_userdata[0]; /* pointer to the real user data */ } __attribute__((packed)); -#define khp_data_seq_num kh_payload.khp_data.khp_data_seq_num -#define khp_data_frag_num kh_payload.khp_data.khp_data_frag_num -#define khp_data_frag_seq kh_payload.khp_data.khp_data_frag_seq -#define khp_data_userdata kh_payload.khp_data.khp_data_userdata -#define khp_data_bcast kh_payload.khp_data.khp_data_bcast -#define khp_data_channel kh_payload.khp_data.khp_data_channel -#define khp_data_compress kh_payload.khp_data.khp_data_compress +#define khp_data_v1_seq_num kh_payload.khp_data_v1.khp_data_seq_num +#define khp_data_v1_frag_num kh_payload.khp_data_v1.khp_data_frag_num +#define khp_data_v1_frag_seq kh_payload.khp_data_v1.khp_data_frag_seq +#define khp_data_v1_userdata kh_payload.khp_data_v1.khp_data_userdata +#define khp_data_v1_bcast kh_payload.khp_data_v1.khp_data_bcast +#define khp_data_v1_channel kh_payload.khp_data_v1.khp_data_channel +#define khp_data_v1_compress kh_payload.khp_data_v1.khp_data_compress /* * KNET_HEADER_TYPE_PING / KNET_HEADER_TYPE_PONG */ struct knet_header_payload_ping_v1 { uint8_t khp_ping_link; /* changing khp_ping_link requires changes to thread_rx.c KNET_LINK_DYNIP code handling */ uint32_t khp_ping_time[4]; /* ping timestamp */ seq_num_t khp_ping_seq_num; /* transport host seq_num */ uint8_t khp_ping_timed; /* timed pinged (1) or forced by seq_num (0) */ } __attribute__((packed)); #define khp_ping_v1_link kh_payload.khp_ping_v1.khp_ping_link #define khp_ping_v1_time kh_payload.khp_ping_v1.khp_ping_time #define khp_ping_v1_seq_num kh_payload.khp_ping_v1.khp_ping_seq_num #define khp_ping_v1_timed kh_payload.khp_ping_v1.khp_ping_timed /* * KNET_HEADER_TYPE_PMTUD / KNET_HEADER_TYPE_PMTUD_REPLY */ /* * taken from tracepath6 */ #define KNET_PMTUD_SIZE_V4 65535 #define KNET_PMTUD_SIZE_V6 KNET_PMTUD_SIZE_V4 /* * IPv4/IPv6 header size */ #define KNET_PMTUD_OVERHEAD_V4 20 #define KNET_PMTUD_OVERHEAD_V6 40 #define KNET_PMTUD_MIN_MTU_V4 576 #define KNET_PMTUD_MIN_MTU_V6 1280 struct knet_header_payload_pmtud_v1 { uint8_t khp_pmtud_link; /* link_id */ uint16_t khp_pmtud_size; /* size of the current packet */ uint8_t khp_pmtud_data[0]; /* pointer to empty/random data/fill buffer */ } __attribute__((packed)); #define khp_pmtud_v1_link kh_payload.khp_pmtud_v1.khp_pmtud_link #define khp_pmtud_v1_size kh_payload.khp_pmtud_v1.khp_pmtud_size #define khp_pmtud_v1_data kh_payload.khp_pmtud_v1.khp_pmtud_data /* * PMTUd related functions */ size_t calc_data_outlen(knet_handle_t knet_h, size_t inlen); size_t calc_max_data_outlen(knet_handle_t knet_h, size_t inlen); size_t calc_min_mtu(knet_handle_t knet_h); /* * union to reference possible individual payloads */ union knet_header_payload { - struct knet_header_payload_data khp_data; /* pure data packet struct */ + struct knet_header_payload_data_v1 khp_data_v1; /* pure data packet struct */ struct knet_header_payload_ping_v1 khp_ping_v1; /* heartbeat packet struct */ struct knet_header_payload_pmtud_v1 khp_pmtud_v1; /* Path MTU discovery packet struct */ } __attribute__((packed)); /* * this header CANNOT change or onwire compat will break! */ struct knet_header { uint8_t kh_version; /* this pckt format/version */ uint8_t kh_type; /* from above defines. Tells what kind of pckt it is */ knet_node_id_t kh_node; /* host id of the source host for this pckt */ uint8_t kh_max_ver; /* max version of the protocol supported by this node */ uint8_t kh_pad1; /* make sure to have space in the header to grow features */ union knet_header_payload kh_payload; /* union of potential data struct based on kh_type */ } __attribute__((packed)); /* * extra defines to avoid mingling with sizeof() too much */ #define KNET_HEADER_ALL_SIZE sizeof(struct knet_header) #define KNET_HEADER_SIZE (KNET_HEADER_ALL_SIZE - sizeof(union knet_header_payload)) #define KNET_HEADER_PING_V1_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_ping_v1)) #define KNET_HEADER_PMTUD_V1_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_pmtud_v1)) -#define KNET_HEADER_DATA_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_data)) +#define KNET_HEADER_DATA_V1_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_data_v1)) #endif diff --git a/libknet/onwire_v1.c b/libknet/onwire_v1.c index b9abe1d1..0bf3bb5c 100644 --- a/libknet/onwire_v1.c +++ b/libknet/onwire_v1.c @@ -1,120 +1,216 @@ /* * Copyright (C) 2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under LGPL-2.0+ */ #include "config.h" +#include #include #include #include #include #include #include "logging.h" #include "host.h" #include "links.h" #include "onwire_v1.h" int prep_ping_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, struct timespec clock_now, int timed, ssize_t *outlen) { *outlen = KNET_HEADER_PING_V1_SIZE; /* preparing ping buffer */ knet_h->pingbuf->kh_version = onwire_ver; knet_h->pingbuf->kh_max_ver = KNET_HEADER_ONWIRE_MAX_VER; knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING; knet_h->pingbuf->kh_node = htons(knet_h->host_id); knet_h->pingbuf->khp_ping_v1_link = dst_link->link_id; knet_h->pingbuf->khp_ping_v1_timed = timed; memmove(&knet_h->pingbuf->khp_ping_v1_time[0], &clock_now, sizeof(struct timespec)); if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get seq mutex lock"); return -1; } knet_h->pingbuf->khp_ping_v1_seq_num = htons(knet_h->tx_seq_num); pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); return 0; } void prep_pong_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen) { *outlen = KNET_HEADER_PING_V1_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PONG; inbuf->kh_node = htons(knet_h->host_id); } void process_ping_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len) { int wipe_bufs = 0; seq_num_t recv_seq_num = ntohs(inbuf->khp_ping_v1_seq_num); if (!inbuf->khp_ping_v1_timed) { /* * we might be receiving this message from all links, but we want * to process it only the first time */ if (recv_seq_num != src_host->untimed_rx_seq_num) { /* * cache the untimed seq num */ src_host->untimed_rx_seq_num = recv_seq_num; /* * if the host has received data in between * untimed ping, then we don't need to wipe the bufs */ if (src_host->got_data) { src_host->got_data = 0; wipe_bufs = 0; } else { wipe_bufs = 1; } } _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs); } else { /* * pings always arrives in bursts over all the link * catch the first of them to cache the seq num and * avoid duplicate processing */ if (recv_seq_num != src_host->timed_rx_seq_num) { src_host->timed_rx_seq_num = recv_seq_num; if (recv_seq_num == 0) { _seq_num_lookup(src_host, recv_seq_num, 0, 1); } } } } void process_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, struct timespec *recvtime) { memmove(recvtime, &inbuf->khp_ping_v1_time[0], sizeof(struct timespec)); } +struct knet_link *get_link_from_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_header *inbuf) +{ + return &src_host->link[inbuf->khp_ping_v1_link]; +} + void prep_pmtud_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, size_t onwire_len) { knet_h->pmtudbuf->kh_version = onwire_ver; knet_h->pmtudbuf->kh_max_ver = KNET_HEADER_ONWIRE_MAX_VER; knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD; knet_h->pmtudbuf->kh_node = htons(knet_h->host_id); knet_h->pmtudbuf->khp_pmtud_v1_link = dst_link->link_id; knet_h->pmtudbuf->khp_pmtud_v1_size = onwire_len; } void prep_pmtud_reply_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen) { *outlen = KNET_HEADER_PMTUD_V1_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; inbuf->kh_node = htons(knet_h->host_id); } void process_pmtud_reply_v1(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf) { src_link->last_recv_mtu = inbuf->khp_pmtud_v1_size; } + +void prep_tx_bufs_v1(knet_handle_t knet_h, + struct knet_header *inbuf, unsigned char *data, size_t inlen, unsigned int temp_data_mtu, + seq_num_t tx_seq_num, int8_t channel, int bcast, int data_compressed, + int *msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out) +{ + uint8_t frag_idx = 0; + size_t frag_len = inlen; + + /* + * prepare the main header + */ + inbuf->kh_type = KNET_HEADER_TYPE_DATA; + inbuf->kh_version = 1; + inbuf->kh_max_ver = KNET_HEADER_ONWIRE_MAX_VER; + inbuf->kh_node = htons(knet_h->host_id); + + /* + * prepare the data header + */ + inbuf->khp_data_v1_frag_seq = 0; + inbuf->khp_data_v1_bcast = bcast; + inbuf->khp_data_v1_frag_num = ceil((float)inlen / temp_data_mtu); + inbuf->khp_data_v1_channel = channel; + inbuf->khp_data_v1_seq_num = htons(tx_seq_num); + if (data_compressed) { + inbuf->khp_data_v1_compress = knet_h->compress_model; + } else { + inbuf->khp_data_v1_compress = 0; + } + + /* + * handle fragmentation + */ + if (inbuf->khp_data_v1_frag_num > 1) { + while (frag_idx < inbuf->khp_data_v1_frag_num) { + /* + * set the iov_base + */ + iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; + iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_V1_SIZE; + iov_out[frag_idx][1].iov_base = data + (temp_data_mtu * frag_idx); + + /* + * set the len + */ + if (frag_len > temp_data_mtu) { + iov_out[frag_idx][1].iov_len = temp_data_mtu; + } else { + iov_out[frag_idx][1].iov_len = frag_len; + } + + /* + * copy the frag info on all buffers + */ + memmove(knet_h->send_to_links_buf[frag_idx], inbuf, KNET_HEADER_DATA_V1_SIZE); + /* + * bump the frag + */ + knet_h->send_to_links_buf[frag_idx]->khp_data_v1_frag_seq = frag_idx + 1; + + frag_len = frag_len - temp_data_mtu; + frag_idx++; + } + *iovcnt_out = 2; + } else { + iov_out[frag_idx][0].iov_base = (void *)inbuf; + iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_V1_SIZE; + *iovcnt_out = 1; + } + *msgs_to_send = inbuf->khp_data_v1_frag_num; +} + +unsigned char *get_data_v1(knet_handle_t knet_h, struct knet_header *inbuf) +{ + return inbuf->khp_data_v1_userdata; +} + +void get_data_header_info_v1(knet_handle_t knet_h, struct knet_header *inbuf, + ssize_t *header_size, int8_t *channel, + seq_num_t *seq_num, uint8_t *decompress_type, + uint8_t *frags, uint8_t *frag_seq) +{ + *header_size = KNET_HEADER_DATA_V1_SIZE; + *channel = inbuf->khp_data_v1_channel; + *seq_num = ntohs(inbuf->khp_data_v1_seq_num); + *decompress_type = inbuf->khp_data_v1_compress; + *frags = inbuf->khp_data_v1_frag_num; + *frag_seq = inbuf->khp_data_v1_frag_seq; +} diff --git a/libknet/onwire_v1.h b/libknet/onwire_v1.h index 0068f094..c0ca636d 100644 --- a/libknet/onwire_v1.h +++ b/libknet/onwire_v1.h @@ -1,25 +1,37 @@ /* * Copyright (C) 2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_ONWIRE_V1_H__ #define __KNET_ONWIRE_V1_H__ #include #include "internals.h" int prep_ping_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, struct timespec clock_now, int timed, ssize_t *outlen); void prep_pong_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen); void process_ping_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len); void process_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, struct timespec *recvtime); +struct knet_link *get_link_from_pong_v1(knet_handle_t knet_h, struct knet_host *src_host, struct knet_header *inbuf); void prep_pmtud_v1(knet_handle_t knet_h, struct knet_link *dst_link, uint8_t onwire_ver, size_t onwire_len); void prep_pmtud_reply_v1(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *outlen); void process_pmtud_reply_v1(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf); +void prep_tx_bufs_v1(knet_handle_t knet_h, + struct knet_header *inbuf, unsigned char *data, size_t inlen, unsigned int temp_data_mtu, + seq_num_t tx_seq_num, int8_t channel, int bcast, int data_compressed, + int *msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out); + +unsigned char *get_data_v1(knet_handle_t knet_h, struct knet_header *inbuf); + +void get_data_header_info_v1(knet_handle_t knet_h, struct knet_header *inbuf, + ssize_t *header_size, int8_t *channel, + seq_num_t *seq_num, uint8_t *decompress_type, + uint8_t *frags, uint8_t *frag_seq); #endif diff --git a/libknet/tests/pckt_test.c b/libknet/tests/pckt_test.c index e9b73690..35e73c37 100644 --- a/libknet/tests/pckt_test.c +++ b/libknet/tests/pckt_test.c @@ -1,23 +1,23 @@ /* * Copyright (C) 2015-2020 Red Hat, Inc. All rights reserved. * * Author: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include #include "onwire.h" int main(void) { printf("\nKronosnet network header size printout:\n\n"); printf("KNET_HEADER_ALL_SIZE: %zu\n", KNET_HEADER_ALL_SIZE); printf("KNET_HEADER_SIZE: %zu\n", KNET_HEADER_SIZE); printf("KNET_HEADER_PING_V1_SIZE: %zu (%zu)\n", KNET_HEADER_PING_V1_SIZE, sizeof(struct knet_header_payload_ping_v1)); printf("KNET_HEADER_PMTUD_V1_SIZE: %zu (%zu)\n", KNET_HEADER_PMTUD_V1_SIZE, sizeof(struct knet_header_payload_pmtud_v1)); - printf("KNET_HEADER_DATA_SIZE: %zu (%zu)\n", KNET_HEADER_DATA_SIZE, sizeof(struct knet_header_payload_data)); + printf("KNET_HEADER_DATA_V1_SIZE: %zu (%zu)\n", KNET_HEADER_DATA_V1_SIZE, sizeof(struct knet_header_payload_data_v1)); return 0; } diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index 6655c60b..2f7c27c0 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -1,825 +1,919 @@ /* * Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "links.h" #include "links_acl.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_pmtud.h" #include "threads_rx.h" #include "netutils.h" +#include "onwire_v1.h" /* * RECV */ /* * return 1 if a > b * return -1 if b > a * return 0 if they are equal */ -static inline int timecmp(struct timespec a, struct timespec b) +static inline int _timecmp(struct timespec a, struct timespec b) { if (a.tv_sec != b.tv_sec) { if (a.tv_sec > b.tv_sec) { return 1; } else { return -1; } } else { if (a.tv_nsec > b.tv_nsec) { return 1; } else if (a.tv_nsec < b.tv_nsec) { return -1; } else { return 0; } } } /* * this functions needs to return an index (0 to 7) * to a knet_host_defrag_buf. (-1 on errors) */ -static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) +static int _find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_host *src_host, seq_num_t seq_num) { - struct knet_host *src_host = knet_h->host_index[inbuf->kh_node]; int i, oldest; /* * check if there is a buffer already in use handling the same seq_num */ for (i = 0; i < KNET_MAX_LINK; i++) { if (src_host->defrag_buf[i].in_use) { - if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) { + if (src_host->defrag_buf[i].pckt_seq == seq_num) { return i; } } } /* * If there is no buffer that's handling the current seq_num * either it's new or it's been reclaimed already. * check if it's been reclaimed/seen before using the defrag circular * buffer. If the pckt has been seen before, the buffer expired (ETIME) * and there is no point to try to defrag it again. */ - if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) { + if (!_seq_num_lookup(src_host, seq_num, 1, 0)) { errno = ETIME; return -1; } /* * register the pckt as seen */ - _seq_num_set(src_host, inbuf->khp_data_seq_num, 1); + _seq_num_set(src_host, seq_num, 1); /* * see if there is a free buffer */ for (i = 0; i < KNET_MAX_LINK; i++) { if (!src_host->defrag_buf[i].in_use) { return i; } } /* * at this point, there are no free buffers, the pckt is new * and we need to reclaim a buffer, and we will take the one * with the oldest timestamp. It's as good as any. */ oldest = 0; for (i = 0; i < KNET_MAX_LINK; i++) { - if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { + if (_timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { oldest = i; } } src_host->defrag_buf[oldest].in_use = 0; return oldest; } -static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len) +static int _pckt_defrag(knet_handle_t knet_h, struct knet_host *src_host, seq_num_t seq_num, unsigned char *data, ssize_t *len, uint8_t frags, uint8_t frag_seq) { struct knet_host_defrag_buf *defrag_buf; int defrag_buf_idx; - defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf); + defrag_buf_idx = _find_pckt_defrag_buf(knet_h, src_host, seq_num); if (defrag_buf_idx < 0) { return 1; } - defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx]; + defrag_buf = &src_host->defrag_buf[defrag_buf_idx]; /* * if the buf is not is use, then make sure it's clean */ if (!defrag_buf->in_use) { memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); defrag_buf->in_use = 1; - defrag_buf->pckt_seq = inbuf->khp_data_seq_num; + defrag_buf->pckt_seq = seq_num; } /* * update timestamp on the buffer */ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); /* * check if we already received this fragment */ - if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) { + if (defrag_buf->frag_map[frag_seq]) { /* * if we have received this fragment and we didn't clear the buffer * it means that we don't have all fragments yet */ return 1; } /* * we need to handle the last packet with gloves due to its different size */ - if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) { + if (frag_seq == frags) { defrag_buf->last_frag_size = *len; /* * in the event when the last packet arrives first, * we still don't know the offset vs the other fragments (based on MTU), * so we store the fragment at the end of the buffer where it's safe * and take a copy of the len so that we can restore its offset later. * remember we can't use the local MTU for this calculation because pMTU * can be asymettric between the same hosts. */ if (!defrag_buf->frag_size) { defrag_buf->last_first = 1; memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), - inbuf->khp_data_userdata, + data, *len); } } else { defrag_buf->frag_size = *len; } if (defrag_buf->frag_size) { - memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), - inbuf->khp_data_userdata, *len); + memmove(defrag_buf->buf + ((frag_seq - 1) * defrag_buf->frag_size), + data, *len); } defrag_buf->frag_recv++; - defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1; + defrag_buf->frag_map[frag_seq] = 1; /* * check if we received all the fragments */ - if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) { + if (defrag_buf->frag_recv == frags) { /* * special case the last pckt */ if (defrag_buf->last_first) { - memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size), + memmove(defrag_buf->buf + ((frags - 1) * defrag_buf->frag_size), defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), defrag_buf->last_frag_size); } /* * recalculate packet lenght */ - *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; + *len = ((frags - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; /* * copy the pckt back in the user data */ - memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len); + memmove(data, defrag_buf->buf, *len); /* * free this buffer */ defrag_buf->in_use = 0; return 0; } return 1; } -static void process_data(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len, uint64_t decrypt_time) +static int _handle_data_stats(knet_handle_t knet_h, struct knet_link *src_link, ssize_t len, uint64_t decrypt_time) { - int err = 0, stats_err = 0; - knet_node_id_t dst_host_ids[KNET_MAX_HOST]; - size_t dst_host_ids_entries = 0; - int bcast = 1; - struct iovec iov_out[1]; - int8_t channel; - ssize_t outlen; + int stats_err; /* data stats at the top for consistency with TX */ src_link->status.stats.rx_data_packets++; src_link->status.stats.rx_data_bytes += len; if (decrypt_time) { stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); - return; + return -1; } /* Only update the crypto overhead for data packets. Mainly to be consistent with TX */ if (decrypt_time < knet_h->stats.rx_crypt_time_min) { knet_h->stats.rx_crypt_time_min = decrypt_time; } if (decrypt_time > knet_h->stats.rx_crypt_time_max) { knet_h->stats.rx_crypt_time_max = decrypt_time; } knet_h->stats.rx_crypt_time_ave = (knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets + decrypt_time) / (knet_h->stats.rx_crypt_packets+1); knet_h->stats.rx_crypt_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } + return 0; +} - inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num); - channel = inbuf->khp_data_channel; - src_host->got_data = 1; - - if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { - if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { - log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); - } - return; - } - - if (inbuf->khp_data_frag_num > 1) { - /* - * len as received from the socket also includes extra stuff - * that the defrag code doesn't care about. So strip it - * here and readd only for repadding once we are done - * defragging - */ - len = len - KNET_HEADER_DATA_SIZE; - if (pckt_defrag(knet_h, inbuf, &len)) { - return; - } - len = len + KNET_HEADER_DATA_SIZE; - } +static int _decompress_data(knet_handle_t knet_h, uint8_t decompress_type, unsigned char *data, ssize_t *len, ssize_t header_size) +{ + int err = 0, stats_err = 0; - if (inbuf->khp_data_compress) { + if (decompress_type) { ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; - uint64_t compress_time; + uint64_t decompress_time; clock_gettime(CLOCK_MONOTONIC, &start_time); - err = decompress(knet_h, inbuf->khp_data_compress, - (const unsigned char *)inbuf->khp_data_userdata, - len - KNET_HEADER_DATA_SIZE, + err = decompress(knet_h, decompress_type, + data, + *len - header_size, knet_h->recv_from_links_buf_decompress, &decmp_outlen); + clock_gettime(CLOCK_MONOTONIC, &end_time); + timespec_diff(start_time, end_time, &decompress_time); + stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); - return; + return -1; } - clock_gettime(CLOCK_MONOTONIC, &end_time); - timespec_diff(start_time, end_time, &compress_time); - if (!err) { /* Collect stats */ - if (compress_time < knet_h->stats.rx_compress_time_min) { - knet_h->stats.rx_compress_time_min = compress_time; + if (decompress_time < knet_h->stats.rx_compress_time_min) { + knet_h->stats.rx_compress_time_min = decompress_time; } - if (compress_time > knet_h->stats.rx_compress_time_max) { - knet_h->stats.rx_compress_time_max = compress_time; + if (decompress_time > knet_h->stats.rx_compress_time_max) { + knet_h->stats.rx_compress_time_max = decompress_time; } knet_h->stats.rx_compress_time_ave = (knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets + - compress_time) / (knet_h->stats.rx_compressed_packets+1); + decompress_time) / (knet_h->stats.rx_compressed_packets+1); knet_h->stats.rx_compressed_packets++; knet_h->stats.rx_compressed_original_bytes += decmp_outlen; - knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE; + knet_h->stats.rx_compressed_size_bytes += *len - KNET_HEADER_SIZE; - memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen); - len = decmp_outlen + KNET_HEADER_DATA_SIZE; + memmove(data, knet_h->recv_from_links_buf_decompress, decmp_outlen); + *len = decmp_outlen + header_size; } else { knet_h->stats.rx_failed_to_decompress++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s", err, strerror(errno)); - return; + return -1; } pthread_mutex_unlock(&knet_h->handle_stats_mutex); } + return 0; +} - if (!src_host->status.reachable) { - log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id); - return; - } - - if (knet_h->enabled != 1) /* data forward is disabled */ - return; +static int _check_destination(knet_handle_t knet_h, struct knet_header *inbuf, unsigned char *data, ssize_t len, ssize_t header_size, int8_t *channel) +{ + knet_node_id_t dst_host_ids[KNET_MAX_HOST]; + size_t dst_host_ids_entries = 0; + int bcast = 1; + size_t host_idx; + int found = 0; if (knet_h->dst_host_filter_fn) { - size_t host_idx; - int found = 0; - bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, - (const unsigned char *)inbuf->khp_data_userdata, - len - KNET_HEADER_DATA_SIZE, + data, + len - header_size, KNET_NOTIFY_RX, knet_h->host_id, inbuf->kh_node, - &channel, + channel, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) { log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); - return; + return -1; } if ((!bcast) && (!dst_host_ids_entries)) { log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); - return; + return -1; } /* check if we are dst for this packet */ if (!bcast) { if (dst_host_ids_entries > KNET_MAX_HOST) { log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations"); - return; + return -1; } for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { if (dst_host_ids[host_idx] == knet_h->host_id) { found = 1; break; } } if (!found) { log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); - return; + return -1; } } } + return 0; +} - if (!knet_h->sockfd[channel].in_use) { - log_debug(knet_h, KNET_SUB_RX, - "received packet for channel %d but there is no local sock connected", - channel); - return; - } +static int _deliver_data(knet_handle_t knet_h, unsigned char *data, ssize_t len, ssize_t header_size, int8_t channel) +{ + struct iovec iov_out[1]; + ssize_t outlen = 0; - outlen = 0; memset(iov_out, 0, sizeof(iov_out)); retry: - iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen; - iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE); + iov_out[0].iov_base = (void *) data + outlen; + iov_out[0].iov_len = len - (outlen + header_size); outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) { log_debug(knet_h, KNET_SUB_RX, "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n", iov_out[0].iov_len, outlen); goto retry; } if (outlen <= 0) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_RX, outlen, errno); + return -1; + } + + if ((size_t)outlen != iov_out[0].iov_len) { + return -1; + } + + return 0; +} + +static void _process_data(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len, uint64_t decrypt_time) +{ + int8_t channel; + uint8_t decompress_type = 0; + ssize_t header_size; + seq_num_t seq_num; + uint8_t frags, frag_seq; + unsigned char *data; + + if (_handle_data_stats(knet_h, src_link, len, decrypt_time) < 0) { + return; + } + + /* + * register host is sending data. Required to determine if we need + * to reset circular buffers. (see onwire_v1.c) + */ + src_host->got_data = 1; + + switch (inbuf->kh_version) { + case 1: + get_data_header_info_v1(knet_h, inbuf, &header_size, &channel, &seq_num, &decompress_type, &frags, &frag_seq); + data = get_data_v1(knet_h, inbuf); + break; + default: + log_warn(knet_h, KNET_SUB_RX, "processing data onwire version %u not supported", inbuf->kh_version); + return; + break; + } + + if (!_seq_num_lookup(src_host, seq_num, 0, 0)) { + if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { + log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); + } return; } - if ((size_t)outlen == iov_out[0].iov_len) { - _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); + + if (frags > 1) { + /* + * len as received from the socket also includes extra stuff + * that the defrag code doesn't care about. So strip it + * here and readd only for repadding once we are done + * defragging + * + * the defrag code assumes that data packets have all the same size + * except the last one that might be smaller. + * + */ + len = len - header_size; + if (_pckt_defrag(knet_h, src_host, seq_num, data, &len, frags, frag_seq)) { + return; + } + len = len + header_size; } + + if (_decompress_data(knet_h, decompress_type, data, &len, header_size) < 0) { + return; + } + + if (!src_host->status.reachable) { + log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id); + return; + } + + if (knet_h->enabled != 1) /* data forward is disabled */ + return; + + if (_check_destination(knet_h, inbuf, data, len, header_size, &channel) < 0) { + return; + } + + if (!knet_h->sockfd[channel].in_use) { + log_debug(knet_h, KNET_SUB_RX, + "received packet for channel %d but there is no local sock connected", + channel); + return; + } + + if (_deliver_data(knet_h, data, len, header_size, channel) < 0) { + return; + } + + _seq_num_set(src_host, seq_num, 0); } -static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg) +static struct knet_header *_decrypt_packet(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len, uint64_t *decrypt_time) { - int savederrno = 0, stats_err = 0; + int try_decrypt = 0; + int i = 0; + struct timespec start_time; + struct timespec end_time; ssize_t outlen; - struct knet_host *src_host; - struct knet_link *src_link; - uint64_t decrypt_time = 0; - struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; - ssize_t len = msg->msg_len; - int try_decrypt = 0, i, found_link = 0; for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) { if (knet_h->crypto_instance[i]) { try_decrypt = 1; break; } } if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) { log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!"); - return; + return NULL; } if (try_decrypt) { - struct timespec start_time; - struct timespec end_time; - clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_authenticate_and_decrypt(knet_h, (unsigned char *)inbuf, - len, + *len, knet_h->recv_from_links_buf_decrypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) { - return; + return NULL; } log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data"); } else { clock_gettime(CLOCK_MONOTONIC, &end_time); - timespec_diff(start_time, end_time, &decrypt_time); + timespec_diff(start_time, end_time, decrypt_time); - len = outlen; + *len = outlen; inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; } } + return inbuf; +} +static int _packet_checks(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t len) +{ if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) { log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len); - return; + return -1; } if ((inbuf->kh_version > KNET_HEADER_ONWIRE_MAX_VER) && (inbuf->kh_version < KNET_HEADER_ONWIRE_MIN_VER)) { if (KNET_HEADER_ONWIRE_MAX_VER > 1 ) { log_debug(knet_h, KNET_SUB_RX, "Received packet version %u. current node only supports onwire version from %u to %u", inbuf->kh_version, KNET_HEADER_ONWIRE_MIN_VER, KNET_HEADER_ONWIRE_MAX_VER); } else { log_debug(knet_h, KNET_SUB_RX, "Received packet version %u. current node only supports %u", inbuf->kh_version, KNET_HEADER_ONWIRE_MAX_VER); } + return -1; + } + return 0; +} + +static void _handle_dynip(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, int sockfd, const struct knet_mmsghdr *msg) +{ + if (src_link->dynamic == KNET_LINK_DYNIP) { + if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) { + log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", + src_host->host_id, src_link->link_id); + memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage)); + if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), + src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, + src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { + log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); + snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); + snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); + } else { + log_info(knet_h, KNET_SUB_RX, + "host: %u link: %u new connection established from: %s %s", + src_host->host_id, src_link->link_id, + src_link->status.dst_ipaddr, src_link->status.dst_port); + } + } + /* + * transport has already accepted the connection here + * otherwise we would not be receiving packets + */ + transport_link_dyn_connect(knet_h, sockfd, src_link); + } +} + +static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg) +{ + int savederrno = 0, stats_err = 0; + struct knet_host *src_host; + struct knet_link *src_link; + uint64_t decrypt_time = 0; + struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; + ssize_t len = msg->msg_len; + int i, found_link = 0; + + inbuf = _decrypt_packet(knet_h, inbuf, &len, &decrypt_time); + if (!inbuf) { return; } + if (_packet_checks(knet_h, inbuf, len) < 0) { + return; + } + + /* + * determine source host + */ inbuf->kh_node = ntohs(inbuf->kh_node); src_host = knet_h->host_index[inbuf->kh_node]; if (src_host == NULL) { /* host not found */ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); return; } + /* + * deteremine source link + */ if (inbuf->kh_type == KNET_HEADER_TYPE_PING) { switch (inbuf->kh_version) { case 1: - src_link = &src_host->link[inbuf->khp_ping_v1_link]; + src_link = get_link_from_pong_v1(knet_h, src_host, inbuf); break; default: log_warn(knet_h, KNET_SUB_RX, "Parsing ping onwire version %u not supported", inbuf->kh_version); return; + break; } - - if (src_link->dynamic == KNET_LINK_DYNIP) { - if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) { - log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", - src_host->host_id, src_link->link_id); - memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage)); - if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), - src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, - src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { - log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); - snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); - snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); - } else { - log_info(knet_h, KNET_SUB_RX, - "host: %u link: %u new connection established from: %s %s", - src_host->host_id, src_link->link_id, - src_link->status.dst_ipaddr, src_link->status.dst_port); - } - } - /* - * transport has already accepted the connection here - * otherwise we would not be receiving packets - */ - transport_link_dyn_connect(knet_h, sockfd, src_link); - } - } else { /* data packet */ + _handle_dynip(knet_h, src_host, src_link, sockfd, msg); + } else { /* all other packets */ for (i = 0; i < KNET_MAX_LINK; i++) { src_link = &src_host->link[i]; if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) { found_link = 1; break; } } if (!found_link) { log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet."); return; } } stats_err = pthread_mutex_lock(&src_link->link_stats_mutex); if (stats_err) { log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s", src_host->host_id, src_link->link_id, strerror(savederrno)); return; } switch (inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: - process_data(knet_h, src_host, src_link, inbuf, len, decrypt_time); + _process_data(knet_h, src_host, src_link, inbuf, len, decrypt_time); break; case KNET_HEADER_TYPE_PING: process_ping(knet_h, src_host, src_link, inbuf, len); break; case KNET_HEADER_TYPE_PONG: process_pong(knet_h, src_host, src_link, inbuf, len); break; case KNET_HEADER_TYPE_PMTUD: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; /* Unlock so we don't deadlock with tx_mutex */ pthread_mutex_unlock(&src_link->link_stats_mutex); process_pmtud(knet_h, src_link, inbuf); return; /* Don't need to unlock link_stats_mutex */ + break; case KNET_HEADER_TYPE_PMTUD_REPLY: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; /* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */ pthread_mutex_unlock(&src_link->link_stats_mutex); process_pmtud_reply(knet_h, src_link, inbuf); return; + break; default: pthread_mutex_unlock(&src_link->link_stats_mutex); return; + break; } pthread_mutex_unlock(&src_link->link_stats_mutex); } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg) { int err, savederrno; int i, msg_recv, transport; if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { /* * this is normal if a fd got an event and before we grab the read lock * and the link is removed by another thread */ goto exit_unlock; } transport = knet_h->knet_transport_fd_tracker[sockfd].transport; /* * reset msg_namelen to buffer size because after recvmmsg * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6 */ for (i = 0; i < PCKT_RX_BUFS; i++) { msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); } msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; /* * WARNING: man page for recvmmsg is wrong. Kernel implementation here: * recvmmsg can return: * -1 on error * 0 if the previous run of recvmmsg recorded an error on the socket * N number of messages (see exception below). * * If there is an error from recvmsg after receiving a frame or more, the recvmmsg * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and * it will be visibile in the next run. * * Need to be careful how we handle errors at this stage. * * error messages need to be handled on a per transport/protocol base * at this point we have different layers of error handling * - msg_recv < 0 -> error from this run * msg_recv = 0 -> error from previous run and error on socket needs to be cleared * - per-transport message data * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF, * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no * errno set. That means the error api needs to be able to abort the loop below. */ if (msg_recv <= 0) { transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno); goto exit_unlock; } for (i = 0; i < msg_recv; i++) { err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]); /* * TODO: make this section silent once we are confident * all protocols packet handlers are good */ switch(err) { case KNET_TRANSPORT_RX_ERROR: /* on error */ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet"); goto exit_unlock; break; case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue"); break; case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop"); goto exit_unlock; break; case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */ /* * processing incoming packets vs access lists */ if ((knet_h->use_access_lists) && (transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) { if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) { char src_ipaddr[KNET_MAX_HOST_LEN]; char src_port[KNET_MAX_PORT_LEN]; memset(src_ipaddr, 0, KNET_MAX_HOST_LEN); memset(src_port, 0, KNET_MAX_PORT_LEN); if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name), src_ipaddr, KNET_MAX_HOST_LEN, src_port, KNET_MAX_PORT_LEN) < 0) { log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port"); } else { log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port); } /* * continue processing the other packets */ continue; } } _parse_recv_from_links(knet_h, sockfd, &msg[i]); break; case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE: log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue"); break; case KNET_TRANSPORT_RX_OOB_DATA_STOP: log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop"); goto exit_unlock; break; } } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_RX_BUFS]; struct knet_mmsghdr msg[PCKT_RX_BUFS]; struct iovec iov_in[PCKT_RX_BUFS]; set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED); memset(&msg, 0, sizeof(msg)); memset(&events, 0, sizeof(events)); for (i = 0; i < PCKT_RX_BUFS; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; iov_in[i].iov_len = KNET_DATABUFSIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000); /* * the RX threads only need to notify that there has been at least * one successful run after queue flush has been requested. * See setfwd in handle.c */ if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED); } /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { continue; } for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd, msg); } } set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED); return NULL; } ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_in; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)buff; iov_in.iov_len = buff_len; err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index 7c1cfa88..8f4b291a 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -1,866 +1,969 @@ /* * Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" -#include #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_tx.h" #include "netutils.h" +#include "onwire_v1.h" /* * SEND */ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send) { int link_idx, msg_idx, sent_msgs, prev_sent, progress; int err = 0, savederrno = 0, locked = 0; unsigned int i; struct knet_mmsghdr *cur; struct knet_link *cur_link; for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { prev_sent = 0; progress = 1; locked = 0; cur_link = &dst_host->link[dst_host->active_links[link_idx]]; if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) { continue; } savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s", dst_host->host_id, cur_link->link_id, strerror(savederrno)); continue; } locked = 1; msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr; /* Cast for Linux/BSD compatibility */ for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) { cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len; } cur_link->status.stats.tx_data_packets++; msg_idx++; } retry: cur = &msg[prev_sent]; sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport), &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno); switch(err) { case -1: /* unrecoverable error */ cur_link->status.stats.tx_data_errors++; goto out_unlock; break; case 0: /* ignore error and continue */ break; case 1: /* retry to send those same data */ cur_link->status.stats.tx_data_retries++; goto retry; break; } prev_sent = prev_sent + sent_msgs; if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) { if ((sent_msgs) || (progress)) { if (sent_msgs) { progress = 1; } else { progress = 0; } #ifdef DEBUG log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", sent_msgs, msg_idx, dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id); #endif goto retry; } if (!progress) { savederrno = EAGAIN; err = -1; goto out_unlock; } } if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } pthread_mutex_unlock(&cur_link->link_stats_mutex); locked = 0; } out_unlock: if (locked) { pthread_mutex_unlock(&cur_link->link_stats_mutex); } errno = savederrno; return err; } -static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync) +static int _dispatch_to_local(knet_handle_t knet_h, unsigned char *data, size_t inlen, int8_t channel) { - size_t outlen, frag_len; - struct knet_host *dst_host; - knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; - size_t dst_host_ids_entries_temp = 0; - knet_node_id_t dst_host_ids[KNET_MAX_HOST]; - size_t dst_host_ids_entries = 0; - int bcast = 1; - struct iovec iov_out[PCKT_FRAG_MAX][2]; - int iovcnt_out = 2; - uint8_t frag_idx; - unsigned int temp_data_mtu; - size_t host_idx; - int send_mcast = 0; - struct knet_header *inbuf; - int savederrno = 0; - int err = 0; - seq_num_t tx_seq_num; - struct knet_mmsghdr msg[PCKT_FRAG_MAX]; - int msgs_to_send, msg_idx; - unsigned int i; - int j; - int send_local = 0; - int data_compressed = 0; - size_t uncrypted_frag_size; - int stats_locked = 0, stats_err = 0; - - inbuf = knet_h->recv_from_sock_buf; - inbuf->kh_type = KNET_HEADER_TYPE_DATA; - inbuf->kh_version = knet_h->onwire_ver; - inbuf->kh_max_ver = KNET_HEADER_ONWIRE_MAX_VER; - inbuf->khp_data_frag_seq = 0; - inbuf->kh_node = htons(knet_h->host_id); - - if (knet_h->enabled != 1) { - log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); - savederrno = ECANCELED; - err = -1; - goto out_unlock; - } - - if (knet_h->dst_host_filter_fn) { - bcast = knet_h->dst_host_filter_fn( - knet_h->dst_host_filter_fn_private_data, - (const unsigned char *)inbuf->khp_data_userdata, - inlen, - KNET_NOTIFY_TX, - knet_h->host_id, - knet_h->host_id, - &channel, - dst_host_ids_temp, - &dst_host_ids_entries_temp); - if (bcast < 0) { - log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); - savederrno = EFAULT; - err = -1; - goto out_unlock; - } + int err = 0, savederrno = 0; + const unsigned char *buf = data; + ssize_t buflen = inlen; + struct knet_link *local_link = knet_h->host_index[knet_h->host_id]->link; - if ((!bcast) && (!dst_host_ids_entries_temp)) { - log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); - savederrno = EINVAL; - err = -1; - goto out_unlock; - } - - if ((!bcast) && - (dst_host_ids_entries_temp > KNET_MAX_HOST)) { - log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations"); - savederrno = EINVAL; - err = -1; - goto out_unlock; - } +local_retry: + err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen); + savederrno = errno; + if (err < 0) { + log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno)); + local_link->status.stats.tx_data_errors++; + goto out; } - - /* Send to localhost if appropriate and enabled */ - if (knet_h->has_loop_link) { - send_local = 0; - if (bcast) { - send_local = 1; - } else { - for (i=0; i< dst_host_ids_entries_temp; i++) { - if (dst_host_ids_temp[i] == knet_h->host_id) { - send_local = 1; - } - } - } - if (send_local) { - const unsigned char *buf = inbuf->khp_data_userdata; - ssize_t buflen = inlen; - struct knet_link *local_link; - - local_link = knet_h->host_index[knet_h->host_id]->link; - - local_retry: - err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen); - if (err < 0) { - log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno)); - local_link->status.stats.tx_data_errors++; - } - if (err > 0 && err < buflen) { - log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen); - local_link->status.stats.tx_data_retries++; - buf += err; - buflen -= err; - goto local_retry; - } - if (err == buflen) { - local_link->status.stats.tx_data_packets++; - local_link->status.stats.tx_data_bytes += inlen; - } - } + if (err > 0 && err < buflen) { + log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen); + local_link->status.stats.tx_data_retries++; + buf += err; + buflen -= err; + goto local_retry; } - - if (is_sync) { - if ((bcast) || - ((!bcast) && (dst_host_ids_entries_temp > 1))) { - log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); - savederrno = E2BIG; - err = -1; - goto out_unlock; - } + if (err == buflen) { + local_link->status.stats.tx_data_packets++; + local_link->status.stats.tx_data_bytes += inlen; } +out: + errno = savederrno; + return err; +} - /* - * check destinations hosts before spending time - * in fragmenting/encrypting packets to save - * time processing data for unreachable hosts. - * for unicast, also remap the destination data - * to skip unreachable hosts. - */ - - if (!bcast) { - dst_host_ids_entries = 0; - for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { - dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; - if (!dst_host) { - continue; - } - if (!(dst_host->host_id == knet_h->host_id && - knet_h->has_loop_link) && - dst_host->status.reachable) { - dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; - dst_host_ids_entries++; - } - } - if (!dst_host_ids_entries) { - savederrno = EHOSTDOWN; - err = -1; - goto out_unlock; - } - } else { - send_mcast = 0; - for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { - if (!(dst_host->host_id == knet_h->host_id && - knet_h->has_loop_link) && - dst_host->status.reachable) { - send_mcast = 1; - break; - } - } - if (!send_mcast) { - savederrno = EHOSTDOWN; - err = -1; - goto out_unlock; - } - } +static int _prep_tx_bufs(knet_handle_t knet_h, + struct knet_header *inbuf, uint8_t onwire_ver, + unsigned char *data, size_t inlen, + seq_num_t tx_seq_num, int8_t channel, int bcast, int data_compressed, + int *msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out) +{ + int err = 0, savederrno = 0; + unsigned int temp_data_mtu; if (!knet_h->data_mtu) { /* * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but data MTU is still unknown." " Packet might not be delivered." " Assuming minimum IPv4 MTU (%d)", KNET_PMTUD_MIN_MTU_V4); temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; } else { /* * take a copy of the mtu to avoid value changing under * our feet while we are sending a fragmented pckt */ temp_data_mtu = knet_h->data_mtu; } + switch (onwire_ver) { + case 1: + prep_tx_bufs_v1(knet_h, inbuf, data, inlen, temp_data_mtu, tx_seq_num, channel, bcast, data_compressed, msgs_to_send, iov_out, iovcnt_out); + break; + default: /* this should never hit as filters are in place in the calling functions */ + log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver); + savederrno = EINVAL; + err = -1; + goto out; + break; + } + +out: + errno = savederrno; + return err; +} + +static int _compress_data(knet_handle_t knet_h, unsigned char* data, size_t *inlen, int *data_compressed) +{ + int err = 0, savederrno = 0; + int stats_locked = 0, stats_err = 0; + size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS; + struct timespec start_time; + struct timespec end_time; + uint64_t compress_time; + /* * compress data */ - if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) { - size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS; - struct timespec start_time; - struct timespec end_time; - uint64_t compress_time; - - clock_gettime(CLOCK_MONOTONIC, &start_time); - err = compress(knet_h, - (const unsigned char *)inbuf->khp_data_userdata, inlen, - knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen); + if (knet_h->compress_model > 0) { + if (*inlen > knet_h->compress_threshold) { + clock_gettime(CLOCK_MONOTONIC, &start_time); + err = compress(knet_h, + data, *inlen, + knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen); - savederrno = errno; + savederrno = errno; + clock_gettime(CLOCK_MONOTONIC, &end_time); + timespec_diff(start_time, end_time, &compress_time); - stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); - if (stats_err < 0) { - log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); - err = -1; - savederrno = stats_err; - goto out_unlock; - } - stats_locked = 1; - /* Collect stats */ - clock_gettime(CLOCK_MONOTONIC, &end_time); - timespec_diff(start_time, end_time, &compress_time); + stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); + if (stats_err < 0) { + log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); + err = -1; + savederrno = stats_err; + goto out; + } + stats_locked = 1; + /* Collect stats */ - if (compress_time < knet_h->stats.tx_compress_time_min) { - knet_h->stats.tx_compress_time_min = compress_time; - } - if (compress_time > knet_h->stats.tx_compress_time_max) { - knet_h->stats.tx_compress_time_max = compress_time; - } - knet_h->stats.tx_compress_time_ave = - (unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets + - compress_time) / (knet_h->stats.tx_compressed_packets+1); - if (err < 0) { - knet_h->stats.tx_failed_to_compress++; - log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno)); - } else { - knet_h->stats.tx_compressed_packets++; - knet_h->stats.tx_compressed_original_bytes += inlen; - knet_h->stats.tx_compressed_size_bytes += cmp_outlen; - - if (cmp_outlen < inlen) { - memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen); - inlen = cmp_outlen; - data_compressed = 1; + if (compress_time < knet_h->stats.tx_compress_time_min) { + knet_h->stats.tx_compress_time_min = compress_time; + } + if (compress_time > knet_h->stats.tx_compress_time_max) { + knet_h->stats.tx_compress_time_max = compress_time; + } + knet_h->stats.tx_compress_time_ave = + (unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets + + compress_time) / (knet_h->stats.tx_compressed_packets+1); + if (err < 0) { + knet_h->stats.tx_failed_to_compress++; + log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno)); } else { - knet_h->stats.tx_unable_to_compress++; + knet_h->stats.tx_compressed_packets++; + knet_h->stats.tx_compressed_original_bytes += *inlen; + knet_h->stats.tx_compressed_size_bytes += cmp_outlen; + + if (cmp_outlen < *inlen) { + memmove(data, knet_h->send_to_links_buf_compress, cmp_outlen); + *inlen = cmp_outlen; + *data_compressed = 1; + } else { + knet_h->stats.tx_unable_to_compress++; + } } } - } - if (!stats_locked) { - stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); - if (stats_err < 0) { - log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); - err = -1; - savederrno = stats_err; - goto out_unlock; + if (!*data_compressed) { + if (!stats_locked) { + stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); + if (stats_err < 0) { + log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); + err = -1; + savederrno = stats_err; + goto out; + } + stats_locked = 1; + } + knet_h->stats.tx_uncompressed_packets++; + } + if (stats_locked) { + pthread_mutex_unlock(&knet_h->handle_stats_mutex); } } - if (knet_h->compress_model > 0 && !data_compressed) { - knet_h->stats.tx_uncompressed_packets++; - } - pthread_mutex_unlock(&knet_h->handle_stats_mutex); - stats_locked = 0; - /* - * prepare the outgoing buffers - */ +out: + errno = savederrno; + return err; +} - frag_len = inlen; - frag_idx = 0; +static int _encrypt_bufs(knet_handle_t knet_h, int msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int *iovcnt_out) +{ + int err = 0, savederrno = 0, stats_err = 0; + struct timespec start_time; + struct timespec end_time; + uint64_t crypt_time; + uint8_t frag_idx = 0; + size_t outlen, uncrypted_frag_size; + int j; - inbuf->khp_data_bcast = bcast; - inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); - inbuf->khp_data_channel = channel; - if (data_compressed) { - inbuf->khp_data_compress = knet_h->compress_model; - } else { - inbuf->khp_data_compress = 0; + if (knet_h->crypto_in_use_config) { + while (frag_idx < msgs_to_send) { + clock_gettime(CLOCK_MONOTONIC, &start_time); + if (crypto_encrypt_and_signv( + knet_h, + iov_out[frag_idx], *iovcnt_out, + knet_h->send_to_links_buf_crypt[frag_idx], + (ssize_t *)&outlen) < 0) { + log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet"); + savederrno = ECHILD; + err = -1; + goto out; + } + clock_gettime(CLOCK_MONOTONIC, &end_time); + timespec_diff(start_time, end_time, &crypt_time); + + stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); + if (stats_err < 0) { + log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); + err = -1; + savederrno = stats_err; + goto out; + } + + if (crypt_time < knet_h->stats.tx_crypt_time_min) { + knet_h->stats.tx_crypt_time_min = crypt_time; + } + if (crypt_time > knet_h->stats.tx_crypt_time_max) { + knet_h->stats.tx_crypt_time_max = crypt_time; + } + knet_h->stats.tx_crypt_time_ave = + (knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets + + crypt_time) / (knet_h->stats.tx_crypt_packets+1); + + uncrypted_frag_size = 0; + for (j=0; j < *iovcnt_out; j++) { + uncrypted_frag_size += iov_out[frag_idx][j].iov_len; + } + knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size); + knet_h->stats.tx_crypt_packets++; + pthread_mutex_unlock(&knet_h->handle_stats_mutex); + + iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; + iov_out[frag_idx][0].iov_len = outlen; + frag_idx++; + } + *iovcnt_out = 1; } +out: + errno = savederrno; + return err; +} - if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { +static int _get_tx_seq_num(knet_handle_t knet_h, seq_num_t *tx_seq_num) +{ + int savederrno = 0; + + savederrno = pthread_mutex_lock(&knet_h->tx_seq_num_mutex); + if (savederrno) { log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); - goto out_unlock; + errno = savederrno; + return -1; } + knet_h->tx_seq_num++; /* * force seq_num 0 to detect a node that has crashed and rejoining * the knet instance. seq_num 0 will clear the buffers in the RX * thread */ if (knet_h->tx_seq_num == 0) { knet_h->tx_seq_num++; } /* * cache the value in locked context */ - tx_seq_num = knet_h->tx_seq_num; - inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num); + *tx_seq_num = knet_h->tx_seq_num; pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); /* * forcefully broadcast a ping to all nodes every SEQ_MAX / 8 * pckts. * this solves 2 problems: * 1) on TX socket overloads we generate extra pings to keep links alive * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2, * node 3+ will be able to keep in sync on the TX seq_num even without * receiving traffic or pings in betweens. This avoids issues with * rollover of the circular buffer */ - if (tx_seq_num % (SEQ_MAX / 8) == 0) { + if (*tx_seq_num % (SEQ_MAX / 8) == 0) { _send_pings(knet_h, 0); } + return 0; +} - if (inbuf->khp_data_frag_num > 1) { - while (frag_idx < inbuf->khp_data_frag_num) { - /* - * set the iov_base - */ - iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; - iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE; - iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx); - /* - * set the len - */ - if (frag_len > temp_data_mtu) { - iov_out[frag_idx][1].iov_len = temp_data_mtu; - } else { - iov_out[frag_idx][1].iov_len = frag_len; - } +static int _get_data_dests(knet_handle_t knet_h, unsigned char* data, size_t inlen, + int8_t *channel, int *bcast, int *send_local, + knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries, + int is_sync) +{ + int err = 0, savederrno = 0; + knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; /* store destinations from filter */ + size_t dst_host_ids_entries_temp = 0; + size_t dst_host_ids_entries_temp2 = 0; /* workaround gcc here */ + struct knet_host *dst_host; + size_t host_idx; - /* - * copy the frag info on all buffers - */ - knet_h->send_to_links_buf[frag_idx]->kh_version = inbuf->kh_version; - knet_h->send_to_links_buf[frag_idx]->kh_max_ver = inbuf->kh_max_ver; - knet_h->send_to_links_buf[frag_idx]->kh_node = htons(knet_h->host_id); - knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; - knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num; - knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; - knet_h->send_to_links_buf[frag_idx]->khp_data_frag_seq = frag_idx + 1; - knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; - knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; - knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress; - - frag_len = frag_len - temp_data_mtu; - frag_idx++; + if (knet_h->dst_host_filter_fn) { + *bcast = knet_h->dst_host_filter_fn( + knet_h->dst_host_filter_fn_private_data, + data, + inlen, + KNET_NOTIFY_TX, + knet_h->host_id, + knet_h->host_id, + channel, + dst_host_ids_temp, + &dst_host_ids_entries_temp); + if (*bcast < 0) { + log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", *bcast); + savederrno = EFAULT; + err = -1; + goto out; } - iovcnt_out = 2; - } else { - iov_out[frag_idx][0].iov_base = (void *)inbuf; - iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE; - iovcnt_out = 1; - } - if (knet_h->crypto_in_use_config) { - struct timespec start_time; - struct timespec end_time; - uint64_t crypt_time; + if ((!*bcast) && (!dst_host_ids_entries_temp)) { + log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); + savederrno = EINVAL; + err = -1; + goto out; + } - frag_idx = 0; - while (frag_idx < inbuf->khp_data_frag_num) { - clock_gettime(CLOCK_MONOTONIC, &start_time); - if (crypto_encrypt_and_signv( - knet_h, - iov_out[frag_idx], iovcnt_out, - knet_h->send_to_links_buf_crypt[frag_idx], - (ssize_t *)&outlen) < 0) { - log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet"); - savederrno = ECHILD; - err = -1; - goto out_unlock; - } - clock_gettime(CLOCK_MONOTONIC, &end_time); - timespec_diff(start_time, end_time, &crypt_time); + if ((!*bcast) && + (dst_host_ids_entries_temp > KNET_MAX_HOST)) { + log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations"); + savederrno = EINVAL; + err = -1; + goto out; + } - stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); - if (stats_err < 0) { - log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); + if (is_sync) { + if ((*bcast) || + ((!*bcast) && (dst_host_ids_entries_temp > 1))) { + log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); + savederrno = E2BIG; err = -1; - savederrno = stats_err; - goto out_unlock; + goto out; } + } + } - if (crypt_time < knet_h->stats.tx_crypt_time_min) { - knet_h->stats.tx_crypt_time_min = crypt_time; + /* + * check destinations hosts before spending time + * in fragmenting/encrypting packets to save + * time processing data for unreachable hosts. + * for unicast, also remap the destination data + * to skip unreachable hosts. + */ + + if (!*bcast) { + *dst_host_ids_entries = dst_host_ids_entries_temp2; + for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { + dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; + if (!dst_host) { + continue; } - if (crypt_time > knet_h->stats.tx_crypt_time_max) { - knet_h->stats.tx_crypt_time_max = crypt_time; + if ((dst_host->host_id == knet_h->host_id) && + (knet_h->has_loop_link)) { + *send_local = 1; } - knet_h->stats.tx_crypt_time_ave = - (knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets + - crypt_time) / (knet_h->stats.tx_crypt_packets+1); - - uncrypted_frag_size = 0; - for (j=0; j < iovcnt_out; j++) { - uncrypted_frag_size += iov_out[frag_idx][j].iov_len; + if (!((dst_host->host_id == knet_h->host_id) && + (knet_h->has_loop_link)) && + dst_host->status.reachable) { + dst_host_ids[dst_host_ids_entries_temp2] = dst_host_ids_temp[host_idx]; + dst_host_ids_entries_temp2++; } - knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size); - knet_h->stats.tx_crypt_packets++; - pthread_mutex_unlock(&knet_h->handle_stats_mutex); - - iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; - iov_out[frag_idx][0].iov_len = outlen; - frag_idx++; } - iovcnt_out = 1; + if ((!dst_host_ids_entries_temp2) && (!*send_local)) { + savederrno = EHOSTDOWN; + err = -1; + goto out; + } + *dst_host_ids_entries = dst_host_ids_entries_temp2; + } else { + *bcast = 0; + *send_local = 0; + for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { + if ((dst_host->host_id == knet_h->host_id) && + (knet_h->has_loop_link)) { + *send_local = 1; + } + if (!(dst_host->host_id == knet_h->host_id && + knet_h->has_loop_link) && + dst_host->status.reachable) { + *bcast = 1; + } + } + if ((!*bcast) && (!*send_local)) { + savederrno = EHOSTDOWN; + err = -1; + goto out; + } } - memset(&msg, 0, sizeof(msg)); +out: + errno = savederrno; + return err; +} + +static int _prep_and_send_msgs(knet_handle_t knet_h, int bcast, knet_node_id_t *dst_host_ids, size_t dst_host_ids_entries, int msgs_to_send, struct iovec iov_out[PCKT_FRAG_MAX][2], int iovcnt_out) +{ + int err = 0, savederrno = 0; + struct knet_host *dst_host; + struct knet_mmsghdr msg[PCKT_FRAG_MAX]; + int msg_idx; + size_t host_idx; - msgs_to_send = inbuf->khp_data_frag_num; + memset(&msg, 0, sizeof(msg)); msg_idx = 0; while (msg_idx < msgs_to_send) { msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0]; msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out; msg_idx++; } if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { dst_host = knet_h->host_index[dst_host_ids[host_idx]]; err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { - goto out_unlock; + goto out; } } } else { for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send); savederrno = errno; if (err) { - goto out_unlock; + goto out; } } } } -out_unlock: +out: errno = savederrno; return err; } -static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel) +static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, uint8_t onwire_ver, int is_sync) +{ + int err = 0, savederrno = 0; + struct knet_header *inbuf = knet_h->recv_from_sock_buf; /* all TX packets are stored here regardless of the onwire */ + unsigned char *data; /* onwire neutrual pointer to data to send */ + int data_compressed = 0; /* track data compression to fill the header */ + seq_num_t tx_seq_num; + + int bcast = 1; /* assume all packets are to be broadcasted unless filter tells us differently */ + knet_node_id_t dst_host_ids[KNET_MAX_HOST]; /* store destinations from filter */ + size_t dst_host_ids_entries = 0; + int send_local = 0; /* send packets to loopback */ + + struct iovec iov_out[PCKT_FRAG_MAX][2]; + int iovcnt_out = 2; + int msgs_to_send = 0; + + if (knet_h->enabled != 1) { + log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); + savederrno = ECANCELED; + err = -1; + goto out; + } + + switch (onwire_ver) { + case 1: + data = get_data_v1(knet_h, inbuf); + break; + default: /* this should never hit as filters are in place in the calling functions */ + log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver); + savederrno = EINVAL; + err = -1; + goto out; + break; + } + + err = _get_data_dests(knet_h, data, inlen, + &channel, &bcast, &send_local, + dst_host_ids, &dst_host_ids_entries, + is_sync); + if (err < 0) { + savederrno = errno; + goto out; + } + + /* Send to localhost if appropriate and enabled */ + if (send_local) { + err = _dispatch_to_local(knet_h, data, inlen, channel); + if (err < 0) { + savederrno = errno; + goto out; + } + } + + err = _compress_data(knet_h, data, &inlen, &data_compressed); + if (err < 0) { + savederrno = errno; + goto out; + } + + err = _get_tx_seq_num(knet_h, &tx_seq_num); + if (err < 0) { + savederrno = errno; + goto out; + } + + err = _prep_tx_bufs(knet_h, inbuf, onwire_ver, data, inlen, tx_seq_num, channel, bcast, data_compressed, &msgs_to_send, iov_out, &iovcnt_out); + if (err < 0) { + savederrno = errno; + goto out; + } + + err = _encrypt_bufs(knet_h, msgs_to_send, iov_out, &iovcnt_out); + if (err < 0) { + savederrno = errno; + goto out; + } + + err = _prep_and_send_msgs(knet_h, bcast, dst_host_ids, dst_host_ids_entries, msgs_to_send, iov_out, iovcnt_out); + if (err < 0) { + savederrno = errno; + goto out; + } + +out: + errno = savederrno; + return err; +} + +static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, uint8_t onwire_ver, int8_t channel) { ssize_t inlen = 0; int savederrno = 0, docallback = 0; struct iovec iov_in; struct msghdr msg; struct sockaddr_storage address; memset(&iov_in, 0, sizeof(iov_in)); - iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata; - iov_in.iov_len = KNET_MAX_PACKET_SIZE; + + switch (onwire_ver) { + case 1: + iov_in.iov_base = (void *)get_data_v1(knet_h, knet_h->recv_from_sock_buf); + iov_in.iov_len = KNET_MAX_PACKET_SIZE; + break; + default: + log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver); + break; + } memset(&msg, 0, sizeof(struct msghdr)); msg.msg_name = &address; msg.msg_namelen = sizeof(struct sockaddr_storage); msg.msg_iov = &iov_in; msg.msg_iovlen = 1; if ((channel >= 0) && (channel < KNET_DATAFD_MAX) && (!knet_h->sockfd[channel].is_socket)) { inlen = readv(sockfd, msg.msg_iov, 1); } else { inlen = recvmsg(sockfd, &msg, MSG_DONTWAIT | MSG_NOSIGNAL); if (msg.msg_flags & MSG_TRUNC) { log_warn(knet_h, KNET_SUB_TX, "Received truncated message from sock %d. Discarding", sockfd); return; } } if (inlen == 0) { savederrno = 0; docallback = 1; } else if (inlen < 0) { struct epoll_event ev; savederrno = errno; docallback = 1; memset(&ev, 0, sizeof(struct epoll_event)); if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); } else { knet_h->sockfd[channel].has_error = 1; } } else { - _parse_recv_from_sock(knet_h, inlen, channel, 0); + _parse_recv_from_sock(knet_h, inlen, channel, onwire_ver, 0); } if (docallback) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_TX, inlen, savederrno); } } void *_handle_send_to_links_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; int i, nev; int flush, flush_queue_limit; int8_t channel; + uint8_t onwire_ver; set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED); memset(&events, 0, sizeof(events)); flush_queue_limit = 0; while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, knet_h->threads_timer_res / 1000); flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX); /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { /* * ideally we want to communicate that we are done flushing * the queue when we have an epoll timeout event */ if (flush == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } continue; } /* * fall back in case the TX sockets will continue receive traffic * and we do not hit an epoll timeout. * * allow up to a 100 loops to flush queues, then we give up. * there might be more clean ways to do it by checking the buffer queue * on each socket, but we have tons of sockets and calculations can go wrong. * Also, why would you disable data forwarding and still send packets? */ if (flush == KNET_THREAD_QUEUE_FLUSH) { if (flush_queue_limit >= 100) { log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss"); set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED); flush_queue_limit = 0; } else { flush_queue_limit++; } } else { flush_queue_limit = 0; } if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); continue; } + if (pthread_mutex_lock(&knet_h->onwire_mutex)) { + log_debug(knet_h, KNET_SUB_TX, "Unable to get onwire mutex lock"); + goto out_unlock; + } + onwire_ver = knet_h->onwire_ver; + pthread_mutex_unlock(&knet_h->onwire_mutex); + for (i = 0; i < nev; i++) { for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { if ((knet_h->sockfd[channel].in_use) && (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { break; } } if (channel >= KNET_DATAFD_MAX) { log_debug(knet_h, KNET_SUB_TX, "No available channels"); continue; /* channel not found */ } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; } - _handle_send_to_links(knet_h, events[i].data.fd, channel); + _handle_send_to_links(knet_h, events[i].data.fd, onwire_ver, channel); pthread_mutex_unlock(&knet_h->tx_mutex); } - +out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED); return NULL; } int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0, err = 0; + uint8_t onwire_ver; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out; } + if (pthread_mutex_lock(&knet_h->onwire_mutex)) { + log_debug(knet_h, KNET_SUB_TX, "Unable to get onwire mutex lock"); + goto out; + } + onwire_ver = knet_h->onwire_ver; + pthread_mutex_unlock(&knet_h->onwire_mutex); + savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", strerror(savederrno)); err = -1; goto out; } - memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len); - err = _parse_recv_from_sock(knet_h, buff_len, channel, 1); + switch (onwire_ver) { + case 1: + memmove(get_data_v1(knet_h, knet_h->recv_from_sock_buf), buff, buff_len); + break; + default: + log_warn(knet_h, KNET_SUB_TX, "preparing sync data onwire version %u not supported", onwire_ver); + goto out_tx; + break; + } + + err = _parse_recv_from_sock(knet_h, buff_len, channel, onwire_ver, 1); savederrno = errno; +out_tx: pthread_mutex_unlock(&knet_h->tx_mutex); - out: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_out[1]; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(iov_out, 0, sizeof(iov_out)); iov_out[0].iov_base = (void *)buff; iov_out[0].iov_len = buff_len; err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; }