Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F2825332
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
48 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/libknet/links_acl_ip.c b/libknet/links_acl_ip.c
index 0f269ef1..0f9c0f7e 100644
--- a/libknet/links_acl_ip.c
+++ b/libknet/links_acl_ip.c
@@ -1,310 +1,302 @@
/*
* Copyright (C) 2016-2020 Red Hat, Inc. All rights reserved.
*
* Author: Christine Caulfield <ccaulfie@redhat.com>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdint.h>
#include <string.h>
#include <stdlib.h>
#include "internals.h"
+#include "netutils.h"
#include "logging.h"
#include "transports.h"
#include "links_acl.h"
#include "links_acl_ip.h"
struct ip_acl_match_entry {
check_type_t type;
check_acceptreject_t acceptreject;
struct sockaddr_storage addr1; /* Actual IP address, mask top or low IP */
struct sockaddr_storage addr2; /* high IP address or address bitmask */
struct ip_acl_match_entry *next;
};
-/*
- * s6_addr32 is not defined in BSD userland, only kernel.
- * definition is the same as linux and it works fine for
- * what we need.
- */
-#ifndef s6_addr32
-#define s6_addr32 __u6_addr.__u6_addr32
-#endif
-
/*
* IPv4 See if the address we have matches the current match entry
*/
static int ip_matches_v4(struct sockaddr_storage *checkip, struct ip_acl_match_entry *match_entry)
{
struct sockaddr_in *ip_to_check;
struct sockaddr_in *match1;
struct sockaddr_in *match2;
ip_to_check = (struct sockaddr_in *)checkip;
match1 = (struct sockaddr_in *)&match_entry->addr1;
match2 = (struct sockaddr_in *)&match_entry->addr2;
switch(match_entry->type) {
case CHECK_TYPE_ADDRESS:
if (ip_to_check->sin_addr.s_addr == match1->sin_addr.s_addr)
return 1;
break;
case CHECK_TYPE_MASK:
if ((ip_to_check->sin_addr.s_addr & match2->sin_addr.s_addr) ==
match1->sin_addr.s_addr)
return 1;
break;
case CHECK_TYPE_RANGE:
if ((ntohl(ip_to_check->sin_addr.s_addr) >= ntohl(match1->sin_addr.s_addr)) &&
(ntohl(ip_to_check->sin_addr.s_addr) <= ntohl(match2->sin_addr.s_addr)))
return 1;
break;
}
return 0;
}
/*
* Compare two IPv6 addresses
*/
static int ip6addr_cmp(struct in6_addr *a, struct in6_addr *b)
{
uint64_t a_high, a_low;
uint64_t b_high, b_low;
a_high = ((uint64_t)htonl(a->s6_addr32[0]) << 32) | (uint64_t)htonl(a->s6_addr32[1]);
a_low = ((uint64_t)htonl(a->s6_addr32[2]) << 32) | (uint64_t)htonl(a->s6_addr32[3]);
b_high = ((uint64_t)htonl(b->s6_addr32[0]) << 32) | (uint64_t)htonl(b->s6_addr32[1]);
b_low = ((uint64_t)htonl(b->s6_addr32[2]) << 32) | (uint64_t)htonl(b->s6_addr32[3]);
if (a_high > b_high)
return 1;
if (a_high < b_high)
return -1;
if (a_low > b_low)
return 1;
if (a_low < b_low)
return -1;
return 0;
}
/*
* IPv6 See if the address we have matches the current match entry
*/
static int ip_matches_v6(struct sockaddr_storage *checkip, struct ip_acl_match_entry *match_entry)
{
struct sockaddr_in6 *ip_to_check;
struct sockaddr_in6 *match1;
struct sockaddr_in6 *match2;
int i;
ip_to_check = (struct sockaddr_in6 *)checkip;
match1 = (struct sockaddr_in6 *)&match_entry->addr1;
match2 = (struct sockaddr_in6 *)&match_entry->addr2;
switch(match_entry->type) {
case CHECK_TYPE_ADDRESS:
if (!memcmp(ip_to_check->sin6_addr.s6_addr32, match1->sin6_addr.s6_addr32, sizeof(struct in6_addr)))
return 1;
break;
case CHECK_TYPE_MASK:
/*
* Note that this little loop will quit early if there is a non-match so the
* comparison might look backwards compared to the IPv4 one
*/
for (i=sizeof(struct in6_addr)/4-1; i>=0; i--) {
if ((ip_to_check->sin6_addr.s6_addr32[i] & match2->sin6_addr.s6_addr32[i]) !=
match1->sin6_addr.s6_addr32[i])
return 0;
}
return 1;
case CHECK_TYPE_RANGE:
if ((ip6addr_cmp(&ip_to_check->sin6_addr, &match1->sin6_addr) >= 0) &&
(ip6addr_cmp(&ip_to_check->sin6_addr, &match2->sin6_addr) <= 0))
return 1;
break;
}
return 0;
}
int ipcheck_validate(void *fd_tracker_match_entry_head, struct sockaddr_storage *checkip)
{
struct ip_acl_match_entry **match_entry_head = (struct ip_acl_match_entry **)fd_tracker_match_entry_head;
struct ip_acl_match_entry *match_entry = *match_entry_head;
int (*match_fn)(struct sockaddr_storage *checkip, struct ip_acl_match_entry *match_entry);
if (checkip->ss_family == AF_INET) {
match_fn = ip_matches_v4;
} else {
match_fn = ip_matches_v6;
}
while (match_entry) {
if (match_fn(checkip, match_entry)) {
if (match_entry->acceptreject == CHECK_ACCEPT)
return 1;
else
return 0;
}
match_entry = match_entry->next;
}
return 0; /* Default reject */
}
/*
* Routines to manuipulate access lists
*/
void ipcheck_rmall(void *fd_tracker_match_entry_head)
{
struct ip_acl_match_entry **match_entry_head = (struct ip_acl_match_entry **)fd_tracker_match_entry_head;
struct ip_acl_match_entry *next_match_entry;
struct ip_acl_match_entry *match_entry = *match_entry_head;
while (match_entry) {
next_match_entry = match_entry->next;
free(match_entry);
match_entry = next_match_entry;
}
*match_entry_head = NULL;
}
static struct ip_acl_match_entry *ipcheck_findmatch(struct ip_acl_match_entry **match_entry_head,
struct sockaddr_storage *ss1, struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
struct ip_acl_match_entry *match_entry = *match_entry_head;
while (match_entry) {
if ((!memcmp(&match_entry->addr1, ss1, sizeof(struct sockaddr_storage))) &&
(!memcmp(&match_entry->addr2, ss2, sizeof(struct sockaddr_storage))) &&
(match_entry->type == type) &&
(match_entry->acceptreject == acceptreject)) {
return match_entry;
}
match_entry = match_entry->next;
}
return NULL;
}
int ipcheck_rmip(void *fd_tracker_match_entry_head,
struct sockaddr_storage *ss1, struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
struct ip_acl_match_entry **match_entry_head = (struct ip_acl_match_entry **)fd_tracker_match_entry_head;
struct ip_acl_match_entry *next_match_entry = NULL;
struct ip_acl_match_entry *rm_match_entry;
struct ip_acl_match_entry *match_entry = *match_entry_head;
rm_match_entry = ipcheck_findmatch(match_entry_head, ss1, ss2, type, acceptreject);
if (!rm_match_entry) {
errno = ENOENT;
return -1;
}
while (match_entry) {
next_match_entry = match_entry->next;
/*
* we are removing the list head, be careful
*/
if (rm_match_entry == match_entry) {
*match_entry_head = next_match_entry;
free(match_entry);
break;
}
/*
* the next one is the one we need to remove
*/
if (rm_match_entry == next_match_entry) {
match_entry->next = next_match_entry->next;
free(next_match_entry);
break;
}
match_entry = next_match_entry;
}
return 0;
}
int ipcheck_addip(void *fd_tracker_match_entry_head, int index,
struct sockaddr_storage *ss1, struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
struct ip_acl_match_entry **match_entry_head = (struct ip_acl_match_entry **)fd_tracker_match_entry_head;
struct ip_acl_match_entry *new_match_entry;
struct ip_acl_match_entry *match_entry = *match_entry_head;
int i = 0;
if (ipcheck_findmatch(match_entry_head, ss1, ss2, type, acceptreject) != NULL) {
errno = EEXIST;
return -1;
}
new_match_entry = malloc(sizeof(struct ip_acl_match_entry));
if (!new_match_entry) {
return -1;
}
memmove(&new_match_entry->addr1, ss1, sizeof(struct sockaddr_storage));
memmove(&new_match_entry->addr2, ss2, sizeof(struct sockaddr_storage));
new_match_entry->type = type;
new_match_entry->acceptreject = acceptreject;
new_match_entry->next = NULL;
if (match_entry) {
/*
* special case for index 0, since we need to update
* the head of the list
*/
if (index == 0) {
*match_entry_head = new_match_entry;
new_match_entry->next = match_entry;
} else {
/*
* find the end of the list or stop at "index"
*/
while (match_entry->next) {
match_entry = match_entry->next;
if (i == index) {
break;
}
i++;
}
/*
* insert if there are more entries in the list
*/
if (match_entry->next) {
new_match_entry->next = match_entry->next;
}
/*
* add if we are at the end
*/
match_entry->next = new_match_entry;
}
} else {
/*
* first entry in the list
*/
*match_entry_head = new_match_entry;
}
return 0;
}
diff --git a/libknet/netutils.c b/libknet/netutils.c
index 754e1984..25bba33f 100644
--- a/libknet/netutils.c
+++ b/libknet/netutils.c
@@ -1,176 +1,133 @@
/*
* Copyright (C) 2010-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <errno.h>
#include <string.h>
#include "internals.h"
#include "netutils.h"
-static int is_v4_mapped(const struct sockaddr_storage *ss, socklen_t salen)
+int cmpaddr(const struct sockaddr_storage *ss1, const struct sockaddr_storage *ss2)
{
- char map[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff };
- struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *) ss;
- return memcmp(&addr6->sin6_addr, map, 12);
-}
-
-int cmpaddr(const struct sockaddr_storage *ss1, socklen_t sslen1,
- const struct sockaddr_storage *ss2, socklen_t sslen2)
-{
- int ss1_offset = 0, ss2_offset = 0;
struct sockaddr_in6 *ss1_addr6 = (struct sockaddr_in6 *)ss1;
struct sockaddr_in6 *ss2_addr6 = (struct sockaddr_in6 *)ss2;
struct sockaddr_in *ss1_addr = (struct sockaddr_in *)ss1;
struct sockaddr_in *ss2_addr = (struct sockaddr_in *)ss2;
- char *addr1, *addr2;
- if (ss1->ss_family == ss2->ss_family) {
- return memcmp(ss1, ss2, sslen1);
+ if (ss1->ss_family != ss2->ss_family) {
+ return -1;
}
if (ss1->ss_family == AF_INET6) {
- if (is_v4_mapped(ss1, sslen1)) {
- return 1;
- }
- addr1 = (char *)&ss1_addr6->sin6_addr;
- ss1_offset = 12;
- } else {
- addr1 = (char *)&ss1_addr->sin_addr;
- }
-
- if (ss2->ss_family == AF_INET6) {
- if (is_v4_mapped(ss2, sslen2)) {
- return 1;
- }
- addr2 = (char *)&ss2_addr6->sin6_addr;
- ss2_offset = 12;
- } else {
- addr2 = (char *)&ss2_addr->sin_addr;
+ return memcmp(&ss1_addr6->sin6_addr.s6_addr32, &ss2_addr6->sin6_addr.s6_addr32, sizeof(struct in6_addr));
}
- return memcmp(addr1+ss1_offset, addr2+ss2_offset, 4);
-}
-
-int cpyaddrport(struct sockaddr_storage *dst, const struct sockaddr_storage *src)
-{
- struct sockaddr_in6 *dst_addr6 = (struct sockaddr_in6 *)dst;
- struct sockaddr_in6 *src_addr6 = (struct sockaddr_in6 *)src;
-
- memset(dst, 0, sizeof(struct sockaddr_storage));
-
- if (src->ss_family == AF_INET6) {
- dst->ss_family = src->ss_family;
- memmove(&dst_addr6->sin6_port, &src_addr6->sin6_port, sizeof(in_port_t));
- memmove(&dst_addr6->sin6_addr, &src_addr6->sin6_addr, sizeof(struct in6_addr));
- } else {
- memmove(dst, src, sizeof(struct sockaddr_in));
- }
- return 0;
+ return memcmp(&ss1_addr->sin_addr.s_addr, &ss2_addr->sin_addr.s_addr, sizeof(struct in_addr));
}
socklen_t sockaddr_len(const struct sockaddr_storage *ss)
{
if (ss->ss_family == AF_INET) {
return sizeof(struct sockaddr_in);
} else {
return sizeof(struct sockaddr_in6);
}
}
/*
* exported APIs
*/
int knet_strtoaddr(const char *host, const char *port, struct sockaddr_storage *ss, socklen_t sslen)
{
int err;
struct addrinfo hints;
struct addrinfo *result = NULL;
if (!host) {
errno = EINVAL;
return -1;
}
if (!port) {
errno = EINVAL;
return -1;
}
if (!ss) {
errno = EINVAL;
return -1;
}
if (!sslen) {
errno = EINVAL;
return -1;
}
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
err = getaddrinfo(host, port, &hints, &result);
if (!err) {
memmove(ss, result->ai_addr,
(sslen < result->ai_addrlen) ? sslen : result->ai_addrlen);
freeaddrinfo(result);
}
if (!err)
errno = 0;
return err;
}
int knet_addrtostr(const struct sockaddr_storage *ss, socklen_t sslen,
char *addr_buf, size_t addr_buf_size,
char *port_buf, size_t port_buf_size)
{
int err;
if (!ss) {
errno = EINVAL;
return -1;
}
if (!sslen) {
errno = EINVAL;
return -1;
}
if (!addr_buf) {
errno = EINVAL;
return -1;
}
if (!port_buf) {
errno = EINVAL;
return -1;
}
err = getnameinfo((struct sockaddr *)ss, sockaddr_len(ss),
addr_buf, addr_buf_size,
port_buf, port_buf_size,
NI_NUMERICHOST | NI_NUMERICSERV);
if (!err)
errno = 0;
return err;
}
diff --git a/libknet/netutils.h b/libknet/netutils.h
index ee10b2b1..6395398e 100644
--- a/libknet/netutils.h
+++ b/libknet/netutils.h
@@ -1,19 +1,29 @@
/*
* Copyright (C) 2010-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#ifndef __KNET_NETUTILS_H__
#define __KNET_NETUTILS_H__
#include <sys/socket.h>
+#include <netinet/in.h>
-int cmpaddr(const struct sockaddr_storage *ss1, socklen_t sslen1, const struct sockaddr_storage *ss2, socklen_t sslen2);
-int cpyaddrport(struct sockaddr_storage *dst, const struct sockaddr_storage *src);
+/*
+ * s6_addr32 is not defined in BSD userland, only kernel.
+ * definition is the same as linux and it works fine for
+ * what we need.
+ */
+
+#ifndef s6_addr32
+#define s6_addr32 __u6_addr.__u6_addr32
+#endif
+
+int cmpaddr(const struct sockaddr_storage *ss1, const struct sockaddr_storage *ss2);
socklen_t sockaddr_len(const struct sockaddr_storage *ss);
#endif
diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
index 91f797e7..ad9d9fd3 100644
--- a/libknet/threads_rx.c
+++ b/libknet/threads_rx.c
@@ -1,1057 +1,1062 @@
/*
* Copyright (C) 2012-2020 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/uio.h>
#include <pthread.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "links.h"
#include "links_acl.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_rx.h"
#include "netutils.h"
/*
* RECV
*/
/*
* return 1 if a > b
* return -1 if b > a
* return 0 if they are equal
*/
static inline int timecmp(struct timespec a, struct timespec b)
{
if (a.tv_sec != b.tv_sec) {
if (a.tv_sec > b.tv_sec) {
return 1;
} else {
return -1;
}
} else {
if (a.tv_nsec > b.tv_nsec) {
return 1;
} else if (a.tv_nsec < b.tv_nsec) {
return -1;
} else {
return 0;
}
}
}
/*
* this functions needs to return an index (0 to 7)
* to a knet_host_defrag_buf. (-1 on errors)
*/
static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
{
struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
int i, oldest;
/*
* check if there is a buffer already in use handling the same seq_num
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (src_host->defrag_buf[i].in_use) {
if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
return i;
}
}
}
/*
* If there is no buffer that's handling the current seq_num
* either it's new or it's been reclaimed already.
* check if it's been reclaimed/seen before using the defrag circular
* buffer. If the pckt has been seen before, the buffer expired (ETIME)
* and there is no point to try to defrag it again.
*/
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
errno = ETIME;
return -1;
}
/*
* register the pckt as seen
*/
_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
/*
* see if there is a free buffer
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (!src_host->defrag_buf[i].in_use) {
return i;
}
}
/*
* at this point, there are no free buffers, the pckt is new
* and we need to reclaim a buffer, and we will take the one
* with the oldest timestamp. It's as good as any.
*/
oldest = 0;
for (i = 0; i < KNET_MAX_LINK; i++) {
if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
oldest = i;
}
}
src_host->defrag_buf[oldest].in_use = 0;
return oldest;
}
static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
{
struct knet_host_defrag_buf *defrag_buf;
int defrag_buf_idx;
defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
if (defrag_buf_idx < 0) {
return 1;
}
defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
/*
* if the buf is not is use, then make sure it's clean
*/
if (!defrag_buf->in_use) {
memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
defrag_buf->in_use = 1;
defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
}
/*
* update timestamp on the buffer
*/
clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
/*
* check if we already received this fragment
*/
if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
/*
* if we have received this fragment and we didn't clear the buffer
* it means that we don't have all fragments yet
*/
return 1;
}
/*
* we need to handle the last packet with gloves due to its different size
*/
if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
defrag_buf->last_frag_size = *len;
/*
* in the event when the last packet arrives first,
* we still don't know the offset vs the other fragments (based on MTU),
* so we store the fragment at the end of the buffer where it's safe
* and take a copy of the len so that we can restore its offset later.
* remember we can't use the local MTU for this calculation because pMTU
* can be asymettric between the same hosts.
*/
if (!defrag_buf->frag_size) {
defrag_buf->last_first = 1;
memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
inbuf->khp_data_userdata,
*len);
}
} else {
defrag_buf->frag_size = *len;
}
if (defrag_buf->frag_size) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
inbuf->khp_data_userdata, *len);
}
defrag_buf->frag_recv++;
defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
/*
* check if we received all the fragments
*/
if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
/*
* special case the last pckt
*/
if (defrag_buf->last_first) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
defrag_buf->last_frag_size);
}
/*
* recalculate packet lenght
*/
*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
/*
* copy the pckt back in the user data
*/
memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
/*
* free this buffer
*/
defrag_buf->in_use = 0;
return 0;
}
return 1;
}
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
{
int err = 0, savederrno = 0, stats_err = 0;
ssize_t outlen;
struct knet_host *src_host;
struct knet_link *src_link;
unsigned long long latency_last;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
uint64_t decrypt_time = 0;
struct timespec recvtime;
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_out[1];
int8_t channel;
- struct sockaddr_storage pckt_src;
seq_num_t recv_seq_num;
int wipe_bufs = 0;
- int try_decrypt = 0, decrypted = 0, i;
+ int try_decrypt = 0, decrypted = 0, i, found_link = 0;
for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
if (knet_h->crypto_instance[i]) {
try_decrypt = 1;
break;
}
}
if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) {
log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!");
return;
}
if (try_decrypt) {
struct timespec start_time;
struct timespec end_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_authenticate_and_decrypt(knet_h,
(unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_decrypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) {
return;
}
log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data");
} else {
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &decrypt_time);
len = outlen;
inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
decrypted = 1;
}
}
if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
return;
}
if (inbuf->kh_version != KNET_HEADER_VERSION) {
log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
return;
}
inbuf->kh_node = ntohs(inbuf->kh_node);
src_host = knet_h->host_index[inbuf->kh_node];
if (src_host == NULL) { /* host not found */
log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
return;
}
- src_link = src_host->link +
- (inbuf->khp_ping_link % KNET_MAX_LINK);
if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
+ /* be aware this works only for PING / PONG and PMTUd packets! */
+ src_link = src_host->link +
+ (inbuf->khp_ping_link % KNET_MAX_LINK);
if (src_link->dynamic == KNET_LINK_DYNIP) {
- /*
- * cpyaddrport will only copy address and port of the incoming
- * packet and strip extra bits such as flow and scopeid
- */
- cpyaddrport(&pckt_src, msg->msg_hdr.msg_name);
-
- if (cmpaddr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
- &pckt_src, sockaddr_len(&pckt_src)) != 0) {
+ if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) {
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
src_host->host_id, src_link->link_id);
- memmove(&src_link->dst_addr, &pckt_src, sizeof(struct sockaddr_storage));
- if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(msg->msg_hdr.msg_name),
+ memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage));
+ if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
} else {
log_info(knet_h, KNET_SUB_RX,
"host: %u link: %u new connection established from: %s %s",
src_host->host_id, src_link->link_id,
src_link->status.dst_ipaddr, src_link->status.dst_port);
}
}
/*
* transport has already accepted the connection here
* otherwise we would not be receiving packets
*/
transport_link_dyn_connect(knet_h, sockfd, src_link);
}
+ } else { /* data packet */
+ for (i = 0; i < KNET_MAX_LINK; i++) {
+ src_link = &src_host->link[i];
+ if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) {
+ found_link = 1;
+ break;
+ }
+ }
+ if (!found_link) {
+ log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet.");
+ return;
+ }
}
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
if (stats_err) {
log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s",
src_host->host_id, src_link->link_id, strerror(savederrno));
return;
}
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_HOST_INFO:
case KNET_HEADER_TYPE_DATA:
if (!src_host->status.reachable) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id);
return;
}
inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
channel = inbuf->khp_data_channel;
src_host->got_data = 1;
src_link->status.stats.rx_data_packets++;
src_link->status.stats.rx_data_bytes += len;
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
}
return;
}
if (inbuf->khp_data_frag_num > 1) {
/*
* len as received from the socket also includes extra stuff
* that the defrag code doesn't care about. So strip it
* here and readd only for repadding once we are done
* defragging
*/
len = len - KNET_HEADER_DATA_SIZE;
if (pckt_defrag(knet_h, inbuf, &len)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
len = len + KNET_HEADER_DATA_SIZE;
}
if (inbuf->khp_data_compress) {
ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = decompress(knet_h, inbuf->khp_data_compress,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
knet_h->recv_from_links_buf_decompress,
&decmp_outlen);
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (!err) {
/* Collect stats */
if (compress_time < knet_h->stats.rx_compress_time_min) {
knet_h->stats.rx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.rx_compress_time_max) {
knet_h->stats.rx_compress_time_max = compress_time;
}
knet_h->stats.rx_compress_time_ave =
(knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets +
compress_time) / (knet_h->stats.rx_compressed_packets+1);
knet_h->stats.rx_compressed_packets++;
knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE;
memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
len = decmp_outlen + KNET_HEADER_DATA_SIZE;
} else {
knet_h->stats.rx_failed_to_decompress++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
err, strerror(errno));
return;
}
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (decrypted) {
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
/* Only update the crypto overhead for data packets. Mainly to be
consistent with TX */
if (decrypt_time < knet_h->stats.rx_crypt_time_min) {
knet_h->stats.rx_crypt_time_min = decrypt_time;
}
if (decrypt_time > knet_h->stats.rx_crypt_time_max) {
knet_h->stats.rx_crypt_time_max = decrypt_time;
}
knet_h->stats.rx_crypt_time_ave =
(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
decrypt_time) / (knet_h->stats.rx_crypt_packets+1);
knet_h->stats.rx_crypt_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
if (knet_h->enabled != 1) /* data forward is disabled */
break;
if (knet_h->dst_host_filter_fn) {
size_t host_idx;
int found = 0;
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
&channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
return;
}
if ((!bcast) && (!dst_host_ids_entries)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return;
}
/* check if we are dst for this packet */
if (!bcast) {
if (dst_host_ids_entries > KNET_MAX_HOST) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
return;
}
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
if (!found) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return;
}
}
}
}
if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (!knet_h->sockfd[channel].in_use) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
outlen = 0;
memset(iov_out, 0, sizeof(iov_out));
retry:
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
log_debug(knet_h, KNET_SUB_RX,
"Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
iov_out[0].iov_len, outlen);
goto retry;
}
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
if ((size_t)outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
}
} else { /* HOSTINFO */
knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata;
if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) {
knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id);
}
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
switch(knet_hostinfo->khi_type) {
case KNET_HOSTINFO_TYPE_LINK_UP_DOWN:
break;
case KNET_HOSTINFO_TYPE_LINK_TABLE:
break;
default:
log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id);
break;
}
}
break;
case KNET_HEADER_TYPE_PING:
outlen = KNET_HEADER_PING_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PONG;
inbuf->kh_node = htons(knet_h->host_id);
recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
src_link->status.stats.rx_ping_packets++;
src_link->status.stats.rx_ping_bytes += len;
wipe_bufs = 0;
if (!inbuf->khp_ping_timed) {
/*
* we might be receiving this message from all links, but we want
* to process it only the first time
*/
if (recv_seq_num != src_host->untimed_rx_seq_num) {
/*
* cache the untimed seq num
*/
src_host->untimed_rx_seq_num = recv_seq_num;
/*
* if the host has received data in between
* untimed ping, then we don't need to wipe the bufs
*/
if (src_host->got_data) {
src_host->got_data = 0;
wipe_bufs = 0;
} else {
wipe_bufs = 1;
}
}
_seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
} else {
/*
* pings always arrives in bursts over all the link
* catch the first of them to cache the seq num and
* avoid duplicate processing
*/
if (recv_seq_num != src_host->timed_rx_seq_num) {
src_host->timed_rx_seq_num = recv_seq_num;
if (recv_seq_num == 0) {
_seq_num_lookup(src_host, recv_seq_num, 0, 1);
}
}
}
if (knet_h->crypto_in_use_config) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
outlen,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
knet_h->stats_extra.tx_crypt_pong_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
retry_pong:
if (src_link->transport_connected) {
if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pong_errors++;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pong_retries++;
goto retry_pong;
break;
}
}
src_link->status.stats.tx_pong_packets++;
src_link->status.stats.tx_pong_bytes += outlen;
}
break;
case KNET_HEADER_TYPE_PONG:
src_link->status.stats.rx_pong_packets++;
src_link->status.stats.rx_pong_bytes += len;
clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
timespec_diff(recvtime,
src_link->status.pong_last, &latency_last);
if ((latency_last / 1000llu) > src_link->pong_timeout) {
log_debug(knet_h, KNET_SUB_RX,
"Incoming pong packet from host: %u link: %u has higher latency than pong_timeout. Discarding",
src_host->host_id, src_link->link_id);
} else {
/*
* in words : ('previous mean' * '(count -1)') + 'new value') / 'count'
*/
src_link->status.stats.latency_samples++;
/*
* limit to max_samples (precision)
*/
if (src_link->status.stats.latency_samples >= src_link->latency_max_samples) {
src_link->status.stats.latency_samples = src_link->latency_max_samples;
}
src_link->status.stats.latency_ave =
(((src_link->status.stats.latency_ave * (src_link->status.stats.latency_samples - 1)) + (latency_last / 1000llu)) / src_link->status.stats.latency_samples);
if (src_link->status.stats.latency_ave < src_link->pong_timeout_adj) {
if (!src_link->status.connected) {
if (src_link->received_pong >= src_link->pong_count) {
log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
src_host->host_id, src_link->link_id);
_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0);
} else {
src_link->received_pong++;
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
src_host->host_id, src_link->link_id, src_link->received_pong);
}
}
}
/* Calculate latency stats */
if (src_link->status.stats.latency_ave > src_link->status.stats.latency_max) {
src_link->status.stats.latency_max = src_link->status.stats.latency_ave;
}
if (src_link->status.stats.latency_ave < src_link->status.stats.latency_min) {
src_link->status.stats.latency_min = src_link->status.stats.latency_ave;
}
}
break;
case KNET_HEADER_TYPE_PMTUD:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
outlen = KNET_HEADER_PMTUD_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
inbuf->kh_node = htons(knet_h->host_id);
if (knet_h->crypto_in_use_config) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
outlen,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
knet_h->stats_extra.tx_crypt_pmtu_reply_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
/* Unlock so we don't deadlock with tx_mutex */
pthread_mutex_unlock(&src_link->link_stats_mutex);
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
goto out_pmtud;
}
retry_pmtud:
if (src_link->transport_connected) {
if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pmtu_errors++;
break;
case 0: /* ignore error and continue */
src_link->status.stats.tx_pmtu_errors++;
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pmtu_retries++;
pthread_mutex_unlock(&src_link->link_stats_mutex);
goto retry_pmtud;
break;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
}
pthread_mutex_unlock(&knet_h->tx_mutex);
out_pmtud:
return; /* Don't need to unlock link_stats_mutex */
case KNET_HEADER_TYPE_PMTUD_REPLY:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
/* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
pthread_mutex_unlock(&src_link->link_stats_mutex);
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
break;
}
src_link->last_recv_mtu = inbuf->khp_pmtud_size;
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return;
default:
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int err, savederrno;
int i, msg_recv, transport;
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
/*
* this is normal if a fd got an event and before we grab the read lock
* and the link is removed by another thread
*/
goto exit_unlock;
}
transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
/*
* reset msg_namelen to buffer size because after recvmmsg
* each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
*/
for (i = 0; i < PCKT_RX_BUFS; i++) {
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
}
msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
/*
* WARNING: man page for recvmmsg is wrong. Kernel implementation here:
* recvmmsg can return:
* -1 on error
* 0 if the previous run of recvmmsg recorded an error on the socket
* N number of messages (see exception below).
*
* If there is an error from recvmsg after receiving a frame or more, the recvmmsg
* loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
* it will be visibile in the next run.
*
* Need to be careful how we handle errors at this stage.
*
* error messages need to be handled on a per transport/protocol base
* at this point we have different layers of error handling
* - msg_recv < 0 -> error from this run
* msg_recv = 0 -> error from previous run and error on socket needs to be cleared
* - per-transport message data
* example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
* but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
* - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
* errno set. That means the error api needs to be able to abort the loop below.
*/
if (msg_recv <= 0) {
transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno);
goto exit_unlock;
}
for (i = 0; i < msg_recv; i++) {
err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]);
/*
* TODO: make this section silent once we are confident
* all protocols packet handlers are good
*/
switch(err) {
case KNET_TRANSPORT_RX_ERROR: /* on error */
log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
break;
case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */
/*
* processing incoming packets vs access lists
*/
if ((knet_h->use_access_lists) &&
(transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) {
if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) {
char src_ipaddr[KNET_MAX_HOST_LEN];
char src_port[KNET_MAX_PORT_LEN];
memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
memset(src_port, 0, KNET_MAX_PORT_LEN);
if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name),
src_ipaddr, KNET_MAX_HOST_LEN,
src_port, KNET_MAX_PORT_LEN) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port");
} else {
log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port);
}
/*
* continue processing the other packets
*/
continue;
}
}
_parse_recv_from_links(knet_h, sockfd, &msg[i]);
break;
case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE:
log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue");
break;
case KNET_TRANSPORT_RX_OOB_DATA_STOP:
log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop");
goto exit_unlock;
break;
}
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_RX_BUFS];
struct knet_mmsghdr msg[PCKT_RX_BUFS];
struct iovec iov_in[PCKT_RX_BUFS];
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED);
memset(&msg, 0, sizeof(msg));
memset(&events, 0, sizeof(events));
for (i = 0; i < PCKT_RX_BUFS; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
iov_in[i].iov_len = KNET_DATABUFSIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000);
/*
* the RX threads only need to notify that there has been at least
* one successful run after queue flush has been requested.
* See setfwd in handle.c
*/
if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
}
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
}
}
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED);
return NULL;
}
ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_in;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)buff;
iov_in.iov_len = buff_len;
err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jan 25, 12:13 PM (19 h, 55 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1322518
Default Alt Text
(48 KB)
Attached To
Mode
rK kronosnet
Attached
Detach File
Event Timeline
Log In to Comment