Page Menu
Home
ClusterLabs Projects
Search
Configure Global Search
Log In
Files
F3151819
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
74 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/libknet/internals.h b/libknet/internals.h
index c3f484fd..5ddab9a8 100644
--- a/libknet/internals.h
+++ b/libknet/internals.h
@@ -1,566 +1,567 @@
/*
* Copyright (C) 2010-2019 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#ifndef __KNET_INTERNALS_H__
#define __KNET_INTERNALS_H__
/*
* NOTE: you shouldn't need to include this header normally
*/
#include <pthread.h>
#include "libknet.h"
#include "onwire.h"
#include "compat.h"
#include "threads_common.h"
#define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE
#define KNET_DATABUFSIZE_CRYPT_PAD 1024
#define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD
#define KNET_DATABUFSIZE_COMPRESS_PAD 1024
#define KNET_DATABUFSIZE_COMPRESS KNET_DATABUFSIZE + KNET_DATABUFSIZE_COMPRESS_PAD
#define KNET_RING_RCVBUFF 8388608
#define PCKT_FRAG_MAX UINT8_MAX
#define PCKT_RX_BUFS 512
#define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX
typedef void *knet_transport_link_t; /* per link transport handle */
typedef void *knet_transport_t; /* per knet_h transport handle */
struct knet_transport_ops; /* Forward because of circular dependancy */
struct knet_mmsghdr {
struct msghdr msg_hdr; /* Message header */
unsigned int msg_len; /* Number of bytes transmitted */
};
struct knet_link {
/* required */
struct sockaddr_storage src_addr;
struct sockaddr_storage dst_addr;
/* configurable */
unsigned int dynamic; /* see KNET_LINK_DYN_ define above */
uint8_t priority; /* higher priority == preferred for A/P */
unsigned long long ping_interval; /* interval */
unsigned long long pong_timeout; /* timeout */
unsigned long long pong_timeout_adj; /* timeout adjusted for latency */
uint8_t pong_timeout_backoff; /* see link.h for definition */
unsigned int latency_fix; /* precision */
uint8_t pong_count; /* how many ping/pong to send/receive before link is up */
uint64_t flags;
/* status */
struct knet_link_status status;
/* internals */
uint8_t link_id;
uint8_t transport; /* #defined constant from API */
knet_transport_link_t transport_link; /* link_info_t from transport */
int outsock;
unsigned int configured:1; /* set to 1 if src/dst have been configured transport initialized on this link*/
unsigned int transport_connected:1; /* set to 1 if lower level transport is connected */
unsigned int latency_exp;
uint8_t received_pong;
struct timespec ping_last;
/* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */
uint32_t proto_overhead; /* IP + UDP/SCTP overhead. NOT to be confused
with stats.proto_overhead that includes also knet headers
and crypto headers */
struct timespec pmtud_last;
uint32_t last_ping_size;
uint32_t last_good_mtu;
uint32_t last_bad_mtu;
uint32_t last_sent_mtu;
uint32_t last_recv_mtu;
+ uint32_t pmtud_crypto_timeout_multiplier;/* used by PMTUd to adjust timeouts on high loads */
uint8_t has_valid_mtu;
};
#define KNET_CBUFFER_SIZE 4096
struct knet_host_defrag_buf {
char buf[KNET_DATABUFSIZE];
uint8_t in_use; /* 0 buffer is free, 1 is in use */
seq_num_t pckt_seq; /* identify the pckt we are receiving */
uint8_t frag_recv; /* how many frags did we receive */
uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */
uint8_t last_first; /* special case if we receive the last fragment first */
uint16_t frag_size; /* normal frag size (not the last one) */
uint16_t last_frag_size; /* the last fragment might not be aligned with MTU size */
struct timespec last_update; /* keep time of the last pckt */
};
struct knet_host {
/* required */
knet_node_id_t host_id;
/* configurable */
uint8_t link_handler_policy;
char name[KNET_MAX_HOST_LEN];
/* status */
struct knet_host_status status;
/* internals */
char circular_buffer[KNET_CBUFFER_SIZE];
seq_num_t rx_seq_num;
seq_num_t untimed_rx_seq_num;
seq_num_t timed_rx_seq_num;
uint8_t got_data;
/* defrag/reassembly buffers */
struct knet_host_defrag_buf defrag_buf[KNET_MAX_LINK];
char circular_buffer_defrag[KNET_CBUFFER_SIZE];
/* link stuff */
struct knet_link link[KNET_MAX_LINK];
uint8_t active_link_entries;
uint8_t active_links[KNET_MAX_LINK];
struct knet_host *next;
};
struct knet_sock {
int sockfd[2]; /* sockfd[0] will always be application facing
* and sockfd[1] internal if sockpair has been created by knet */
int is_socket; /* check if it's a socket for recvmmsg usage */
int is_created; /* knet created this socket and has to clean up on exit/del */
int in_use; /* set to 1 if it's use, 0 if free */
int has_error; /* set to 1 if there were errors reading from the sock
* and socket has been removed from epoll */
};
struct knet_fd_trackers {
uint8_t transport; /* transport type (UDP/SCTP...) */
uint8_t data_type; /* internal use for transport to define what data are associated
* with this fd */
void *data; /* pointer to the data */
void *access_list_match_entry_head; /* pointer to access list match_entry list head */
};
#define KNET_MAX_FDS KNET_MAX_HOST * KNET_MAX_LINK * 4
#define KNET_MAX_COMPRESS_METHODS UINT8_MAX
struct knet_handle_stats_extra {
uint64_t tx_crypt_pmtu_packets;
uint64_t tx_crypt_pmtu_reply_packets;
uint64_t tx_crypt_ping_packets;
uint64_t tx_crypt_pong_packets;
};
struct knet_handle {
knet_node_id_t host_id;
unsigned int enabled:1;
struct knet_sock sockfd[KNET_DATAFD_MAX];
int logfd;
uint8_t log_levels[KNET_MAX_SUBSYSTEMS];
int hostsockfd[2];
int dstsockfd[2];
int send_to_links_epollfd;
int recv_from_links_epollfd;
int dst_link_handler_epollfd;
uint8_t use_access_lists; /* set to 0 for disable, 1 for enable */
unsigned int pmtud_interval;
unsigned int data_mtu; /* contains the max data size that we can send onwire
* without frags */
struct knet_host *host_head;
struct knet_host *host_index[KNET_MAX_HOST];
knet_transport_t transports[KNET_MAX_TRANSPORTS+1];
struct knet_fd_trackers knet_transport_fd_tracker[KNET_MAX_FDS]; /* track status for each fd handled by transports */
struct knet_handle_stats stats;
struct knet_handle_stats_extra stats_extra;
uint32_t reconnect_int;
knet_node_id_t host_ids[KNET_MAX_HOST];
size_t host_ids_entries;
struct knet_header *recv_from_sock_buf;
struct knet_header *send_to_links_buf[PCKT_FRAG_MAX];
struct knet_header *recv_from_links_buf[PCKT_RX_BUFS];
struct knet_header *pingbuf;
struct knet_header *pmtudbuf;
uint8_t threads_status[KNET_THREAD_MAX];
uint8_t threads_flush_queue[KNET_THREAD_MAX];
pthread_mutex_t threads_status_mutex;
pthread_t send_to_links_thread;
pthread_t recv_from_links_thread;
pthread_t heartbt_thread;
pthread_t dst_link_handler_thread;
pthread_t pmtud_link_handler_thread;
pthread_rwlock_t global_rwlock; /* global config lock */
pthread_mutex_t pmtud_mutex; /* pmtud mutex to handle conditional send/recv + timeout */
pthread_cond_t pmtud_cond; /* conditional for above */
pthread_mutex_t tx_mutex; /* used to protect knet_send_sync and TX thread */
pthread_mutex_t hb_mutex; /* used to protect heartbeat thread and seq_num broadcasting */
pthread_mutex_t backoff_mutex; /* used to protect dst_link->pong_timeout_adj */
pthread_mutex_t kmtu_mutex; /* used to protect kernel_mtu */
uint32_t kernel_mtu; /* contains the MTU detected by the kernel on a given link */
int pmtud_waiting;
int pmtud_running;
int pmtud_forcerun;
int pmtud_abort;
struct crypto_instance *crypto_instance;
size_t sec_block_size;
size_t sec_hash_size;
size_t sec_salt_size;
unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX];
unsigned char *recv_from_links_buf_crypt;
unsigned char *recv_from_links_buf_decrypt;
unsigned char *pingbuf_crypt;
unsigned char *pmtudbuf_crypt;
int compress_model;
int compress_level;
size_t compress_threshold;
void *compress_int_data[KNET_MAX_COMPRESS_METHODS]; /* for compress method private data */
unsigned char *recv_from_links_buf_decompress;
unsigned char *send_to_links_buf_compress;
seq_num_t tx_seq_num;
pthread_mutex_t tx_seq_num_mutex;
uint8_t has_loop_link;
uint8_t loop_link;
void *dst_host_filter_fn_private_data;
int (*dst_host_filter_fn) (
void *private_data,
const unsigned char *outdata,
ssize_t outdata_len,
uint8_t tx_rx,
knet_node_id_t this_host_id,
knet_node_id_t src_node_id,
int8_t *channel,
knet_node_id_t *dst_host_ids,
size_t *dst_host_ids_entries);
void *pmtud_notify_fn_private_data;
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu);
void *host_status_change_notify_fn_private_data;
void (*host_status_change_notify_fn) (
void *private_data,
knet_node_id_t host_id,
uint8_t reachable,
uint8_t remote,
uint8_t external);
void *sock_notify_fn_private_data;
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno);
int fini_in_progress;
uint64_t flags;
};
extern pthread_rwlock_t shlib_rwlock; /* global shared lib load lock */
/*
* NOTE: every single operation must be implementend
* for every protocol.
*/
/*
* for now knet supports only IP protocols (udp/sctp)
* in future there might be others like ARP
* or TIPC.
* keep this around as transport information
* to use for access lists and other operations
*/
#define TRANSPORT_PROTO_LOOPBACK 0
#define TRANSPORT_PROTO_IP_PROTO 1
/*
* some transports like SCTP can filter incoming
* connections before knet has to process
* any packets.
* GENERIC_ACL -> packet has to be read and filterted
* PROTO_ACL -> transport provides filtering at lower levels
* and packet does not need to be processed
*/
typedef enum {
USE_NO_ACL,
USE_GENERIC_ACL,
USE_PROTO_ACL
} transport_acl;
/*
* make it easier to map values in transports.c
*/
#define TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED 0
#define TRANSPORT_PROTO_IS_CONNECTION_ORIENTED 1
typedef struct knet_transport_ops {
/*
* transport generic information
*/
const char *transport_name;
const uint8_t transport_id;
const uint8_t built_in;
uint8_t transport_protocol;
transport_acl transport_acl_type;
/*
* connection oriented protocols like SCTP
* don´t need dst_addr in sendto calls and
* on some OSes are considered EINVAL.
*/
uint8_t transport_is_connection_oriented;
uint32_t transport_mtu_overhead;
/*
* transport init must allocate the new transport
* and perform all internal initializations
* (threads, lists, etc).
*/
int (*transport_init)(knet_handle_t knet_h);
/*
* transport free must releases _all_ resources
* allocated by tranport_init
*/
int (*transport_free)(knet_handle_t knet_h);
/*
* link operations should take care of all the
* sockets and epoll management for a given link/transport set
* transport_link_disable should return err = -1 and errno = EBUSY
* if listener is still in use, and any other errno in case
* the link cannot be disabled.
*
* set_config/clear_config are invoked in global write lock context
*/
int (*transport_link_set_config)(knet_handle_t knet_h, struct knet_link *link);
int (*transport_link_clear_config)(knet_handle_t knet_h, struct knet_link *link);
/*
* transport callback for incoming dynamic connections
* this is called in global read lock context
*/
int (*transport_link_dyn_connect)(knet_handle_t knet_h, int sockfd, struct knet_link *link);
/*
* return the fd to use for access lists
*/
int (*transport_link_get_acl_fd)(knet_handle_t knet_h, struct knet_link *link);
/*
* per transport error handling of recvmmsg
* (see _handle_recv_from_links comments for details)
*/
/*
* transport_rx_sock_error is invoked when recvmmsg returns <= 0
*
* transport_rx_sock_error is invoked with both global_rdlock
*/
int (*transport_rx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno);
/*
* transport_tx_sock_error is invoked with global_rwlock and
* it's invoked when sendto or sendmmsg returns =< 0
*
* it should return:
* -1 on internal error
* 0 ignore error and continue
* 1 retry
* any sleep or wait action should happen inside the transport code
*/
int (*transport_tx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno);
/*
* this function is called on _every_ received packet
* to verify if the packet is data or internal protocol error handling
*
* it should return:
* -1 on error
* 0 packet is not data and we should continue the packet process loop
* 1 packet is not data and we should STOP the packet process loop
* 2 packet is data and should be parsed as such
*
* transport_rx_is_data is invoked with both global_rwlock
* and fd_tracker read lock (from RX thread)
*/
int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg);
} knet_transport_ops_t;
socklen_t sockaddr_len(const struct sockaddr_storage *ss);
struct pretty_names {
const char *name;
uint8_t val;
};
/**
* This is a kernel style list implementation.
*
* @author Steven Dake <sdake@redhat.com>
*/
struct knet_list_head {
struct knet_list_head *next;
struct knet_list_head *prev;
};
/**
* @def KNET_LIST_DECLARE()
* Declare and initialize a list head.
*/
#define KNET_LIST_DECLARE(name) \
struct knet_list_head name = { &(name), &(name) }
#define KNET_INIT_LIST_HEAD(ptr) do { \
(ptr)->next = (ptr); (ptr)->prev = (ptr); \
} while (0)
/**
* Initialize the list entry.
*
* Points next and prev pointers to head.
* @param head pointer to the list head
*/
static inline void knet_list_init(struct knet_list_head *head)
{
head->next = head;
head->prev = head;
}
/**
* Add this element to the list.
*
* @param element the new element to insert.
* @param head pointer to the list head
*/
static inline void knet_list_add(struct knet_list_head *element,
struct knet_list_head *head)
{
head->next->prev = element;
element->next = head->next;
element->prev = head;
head->next = element;
}
/**
* Add to the list (but at the end of the list).
*
* @param element pointer to the element to add
* @param head pointer to the list head
* @see knet_list_add()
*/
static inline void knet_list_add_tail(struct knet_list_head *element,
struct knet_list_head *head)
{
head->prev->next = element;
element->next = head;
element->prev = head->prev;
head->prev = element;
}
/**
* Delete an entry from the list.
*
* @param _remove the list item to remove
*/
static inline void knet_list_del(struct knet_list_head *_remove)
{
_remove->next->prev = _remove->prev;
_remove->prev->next = _remove->next;
}
/**
* Replace old entry by new one
* @param old: the element to be replaced
* @param new: the new element to insert
*/
static inline void knet_list_replace(struct knet_list_head *old,
struct knet_list_head *new)
{
new->next = old->next;
new->next->prev = new;
new->prev = old->prev;
new->prev->next = new;
}
/**
* Tests whether list is the last entry in list head
* @param list: the entry to test
* @param head: the head of the list
* @return boolean true/false
*/
static inline int knet_list_is_last(const struct knet_list_head *list,
const struct knet_list_head *head)
{
return list->next == head;
}
/**
* A quick test to see if the list is empty (pointing to it's self).
* @param head pointer to the list head
* @return boolean true/false
*/
static inline int32_t knet_list_empty(const struct knet_list_head *head)
{
return head->next == head;
}
/**
* Get the struct for this entry
* @param ptr: the &struct list_head pointer.
* @param type: the type of the struct this is embedded in.
* @param member: the name of the list_struct within the struct.
*/
#define knet_list_entry(ptr,type,member)\
((type *)((char *)(ptr)-(char*)(&((type *)0)->member)))
/**
* Get the first element from a list
* @param ptr: the &struct list_head pointer.
* @param type: the type of the struct this is embedded in.
* @param member: the name of the list_struct within the struct.
*/
#define knet_list_first_entry(ptr, type, member) \
knet_list_entry((ptr)->next, type, member)
/**
* Iterate over a list
* @param pos: the &struct list_head to use as a loop counter.
* @param head: the head for your list.
*/
#define knet_list_for_each(pos, head) \
for (pos = (head)->next; pos != (head); pos = pos->next)
/**
* Iterate over a list backwards
* @param pos: the &struct list_head to use as a loop counter.
* @param head: the head for your list.
*/
#define knet_list_for_each_reverse(pos, head) \
for (pos = (head)->prev; pos != (head); pos = pos->prev)
/**
* Iterate over a list safe against removal of list entry
* @param pos: the &struct list_head to use as a loop counter.
* @param n: another &struct list_head to use as temporary storage
* @param head: the head for your list.
*/
#define knet_list_for_each_safe(pos, n, head) \
for (pos = (head)->next, n = pos->next; pos != (head); \
pos = n, n = pos->next)
/**
* Iterate over list of given type
* @param pos: the type * to use as a loop counter.
* @param head: the head for your list.
* @param member: the name of the list_struct within the struct.
*/
#define knet_list_for_each_entry(pos, head, member) \
for (pos = knet_list_entry((head)->next, typeof(*pos), member); \
&pos->member != (head); \
pos = knet_list_entry(pos->member.next, typeof(*pos), member))
#endif
diff --git a/libknet/links.c b/libknet/links.c
index 9b75be27..cd18d9cc 100644
--- a/libknet/links.c
+++ b/libknet/links.c
@@ -1,1518 +1,1519 @@
/*
* Copyright (C) 2012-2019 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <netdb.h>
#include <string.h>
#include <pthread.h>
#include "internals.h"
#include "logging.h"
#include "links.h"
#include "transports.h"
#include "host.h"
#include "threads_common.h"
#include "links_acl.h"
int _link_updown(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int enabled, unsigned int connected)
{
struct knet_link *link = &knet_h->host_index[host_id]->link[link_id];
if ((link->status.enabled == enabled) &&
(link->status.connected == connected))
return 0;
link->status.enabled = enabled;
link->status.connected = connected;
_host_dstcache_update_async(knet_h, knet_h->host_index[host_id]);
if ((link->status.dynconnected) &&
(!link->status.connected))
link->status.dynconnected = 0;
if (connected) {
time(&link->status.stats.last_up_times[link->status.stats.last_up_time_index]);
link->status.stats.up_count++;
if (++link->status.stats.last_up_time_index > MAX_LINK_EVENTS) {
link->status.stats.last_up_time_index = 0;
}
} else {
time(&link->status.stats.last_down_times[link->status.stats.last_down_time_index]);
link->status.stats.down_count++;
if (++link->status.stats.last_down_time_index > MAX_LINK_EVENTS) {
link->status.stats.last_down_time_index = 0;
}
}
return 0;
}
void _link_clear_stats(knet_handle_t knet_h)
{
struct knet_host *host;
struct knet_link *link;
uint32_t host_id;
uint8_t link_id;
for (host_id = 0; host_id < KNET_MAX_HOST; host_id++) {
host = knet_h->host_index[host_id];
if (!host) {
continue;
}
for (link_id = 0; link_id < KNET_MAX_LINK; link_id++) {
link = &host->link[link_id];
memset(&link->status.stats, 0, sizeof(struct knet_link_stats));
}
}
}
int knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t transport,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr,
uint64_t flags)
{
int savederrno = 0, err = 0, i;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!src_addr) {
errno = EINVAL;
return -1;
}
if (dst_addr && (src_addr->ss_family != dst_addr->ss_family)) {
log_err(knet_h, KNET_SUB_LINK, "Source address family does not match destination address family");
errno = EINVAL;
return -1;
}
if (transport >= KNET_MAX_TRANSPORTS) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id != host_id) {
log_err(knet_h, KNET_SUB_LINK, "Cannot create loopback link to remote node");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
if (knet_h->host_id == host_id && knet_h->has_loop_link) {
log_err(knet_h, KNET_SUB_LINK, "Cannot create more than 1 link when loopback is active");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id == host_id) {
for (i=0; i<KNET_MAX_LINK; i++) {
if (host->link[i].configured) {
log_err(knet_h, KNET_SUB_LINK, "Cannot add loopback link when other links are already configured.");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
}
}
link = &host->link[link_id];
if (link->configured != 0) {
err =-1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled != 0) {
err =-1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
memmove(&link->src_addr, src_addr, sizeof(struct sockaddr_storage));
err = knet_addrtostr(src_addr, sizeof(struct sockaddr_storage),
link->status.src_ipaddr, KNET_MAX_HOST_LEN,
link->status.src_port, KNET_MAX_PORT_LEN);
if (err) {
if (err == EAI_SYSTEM) {
savederrno = errno;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u source addr/port: %s",
host_id, link_id, strerror(savederrno));
} else {
savederrno = EINVAL;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u source addr/port: %s",
host_id, link_id, gai_strerror(err));
}
err = -1;
goto exit_unlock;
}
if (!dst_addr) {
link->dynamic = KNET_LINK_DYNIP;
} else {
link->dynamic = KNET_LINK_STATIC;
memmove(&link->dst_addr, dst_addr, sizeof(struct sockaddr_storage));
err = knet_addrtostr(dst_addr, sizeof(struct sockaddr_storage),
link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
link->status.dst_port, KNET_MAX_PORT_LEN);
if (err) {
if (err == EAI_SYSTEM) {
savederrno = errno;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u destination addr/port: %s",
host_id, link_id, strerror(savederrno));
} else {
savederrno = EINVAL;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u destination addr/port: %s",
host_id, link_id, gai_strerror(err));
}
err = -1;
goto exit_unlock;
}
}
+ link->pmtud_crypto_timeout_multiplier = KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MIN;
link->pong_count = KNET_LINK_DEFAULT_PONG_COUNT;
link->has_valid_mtu = 0;
link->ping_interval = KNET_LINK_DEFAULT_PING_INTERVAL * 1000; /* microseconds */
link->pong_timeout = KNET_LINK_DEFAULT_PING_TIMEOUT * 1000; /* microseconds */
link->pong_timeout_backoff = KNET_LINK_PONG_TIMEOUT_BACKOFF;
link->pong_timeout_adj = link->pong_timeout * link->pong_timeout_backoff; /* microseconds */
link->latency_fix = KNET_LINK_DEFAULT_PING_PRECISION;
link->latency_exp = KNET_LINK_DEFAULT_PING_PRECISION - \
((link->ping_interval * KNET_LINK_DEFAULT_PING_PRECISION) / 8000000);
link->flags = flags;
if (transport_link_set_config(knet_h, link, transport) < 0) {
savederrno = errno;
err = -1;
goto exit_unlock;
}
/*
* we can only configure default access lists if we know both endpoints
* and the protocol uses GENERIC_ACL, otherwise the protocol has
* to setup their own access lists above in transport_link_set_config.
*/
if ((transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL) &&
(link->dynamic == KNET_LINK_STATIC)) {
log_debug(knet_h, KNET_SUB_LINK, "Configuring default access lists for host: %u link: %u socket: %d",
host_id, link_id, link->outsock);
if ((check_add(knet_h, link->outsock, transport, -1,
&link->dst_addr, &link->dst_addr,
CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) {
log_warn(knet_h, KNET_SUB_LINK, "Failed to configure default access lists for host: %u link: %u", host_id, link_id);
savederrno = errno;
err = -1;
goto exit_unlock;
}
}
link->configured = 1;
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is configured",
host_id, link_id);
if (transport == KNET_TRANSPORT_LOOPBACK) {
knet_h->has_loop_link = 1;
knet_h->loop_link = link_id;
host->status.reachable = 1;
link->status.mtu = KNET_PMTUD_SIZE_V6;
} else {
/*
* calculate the minimum MTU that is safe to use,
* based on RFCs and that each network device should
* be able to support without any troubles
*/
if (link->dynamic == KNET_LINK_STATIC) {
/*
* with static link we can be more precise than using
* the generic calc_min_mtu()
*/
switch (link->dst_addr.ss_family) {
case AF_INET6:
link->status.mtu = calc_max_data_outlen(knet_h, KNET_PMTUD_MIN_MTU_V6 - (KNET_PMTUD_OVERHEAD_V6 + link->proto_overhead));
break;
case AF_INET:
link->status.mtu = calc_max_data_outlen(knet_h, KNET_PMTUD_MIN_MTU_V4 - (KNET_PMTUD_OVERHEAD_V4 + link->proto_overhead));
break;
}
} else {
/*
* for dynamic links we start with the minimum MTU
* possible and PMTUd will kick in immediately
* after connection status is 1
*/
link->status.mtu = calc_min_mtu(knet_h);
}
link->has_valid_mtu = 1;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *transport,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr,
uint8_t *dynamic,
uint64_t *flags)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!src_addr) {
errno = EINVAL;
return -1;
}
if (!dynamic) {
errno = EINVAL;
return -1;
}
if (!transport) {
errno = EINVAL;
return -1;
}
if (!flags) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if ((link->dynamic == KNET_LINK_STATIC) && (!dst_addr)) {
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
memmove(src_addr, &link->src_addr, sizeof(struct sockaddr_storage));
*transport = link->transport;
*flags = link->flags;
if (link->dynamic == KNET_LINK_STATIC) {
*dynamic = 0;
memmove(dst_addr, &link->dst_addr, sizeof(struct sockaddr_storage));
} else {
*dynamic = 1;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_clear_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
int sock;
uint8_t transport;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (link->configured != 1) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled != 0) {
err = -1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
/*
* remove well known access lists here.
* After the transport has done clearing the config,
* then we can remove any leftover access lists if the link
* is no longer in use.
*/
if ((transport_get_acl_type(knet_h, link->transport) == USE_GENERIC_ACL) &&
(link->dynamic == KNET_LINK_STATIC)) {
if ((check_rm(knet_h, link->outsock, link->transport,
&link->dst_addr, &link->dst_addr,
CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != ENOENT)) {
err = -1;
savederrno = errno;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u: unable to remove default access list",
host_id, link_id);
goto exit_unlock;
}
}
/*
* cache it for later as we don't know if the transport
* will clear link info during clear_config.
*/
sock = link->outsock;
transport = link->transport;
if ((transport_link_clear_config(knet_h, link) < 0) &&
(errno != EBUSY)) {
savederrno = errno;
err = -1;
goto exit_unlock;
}
/*
* remove any other access lists when the socket is no
* longer in use by the transport.
*/
if ((transport_get_acl_type(knet_h, link->transport) == USE_GENERIC_ACL) &&
(knet_h->knet_transport_fd_tracker[sock].transport == KNET_MAX_TRANSPORTS)) {
check_rmall(knet_h, sock, transport);
}
memset(link, 0, sizeof(struct knet_link));
link->link_id = link_id;
if (knet_h->has_loop_link && host_id == knet_h->host_id && link_id == knet_h->loop_link) {
knet_h->has_loop_link = 0;
if (host->active_link_entries == 0) {
host->status.reachable = 0;
}
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u config has been wiped",
host_id, link_id);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_set_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int enabled)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (enabled > 1) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled == enabled) {
err = 0;
goto exit_unlock;
}
err = _link_updown(knet_h, host_id, link_id, enabled, link->status.connected);
savederrno = errno;
if (enabled) {
goto exit_unlock;
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is disabled",
host_id, link_id);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int *enabled)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!enabled) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*enabled = link->status.enabled;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_set_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t pong_count)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (pong_count < 1) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
link->pong_count = pong_count;
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u pong count update: %u",
host_id, link_id, link->pong_count);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *pong_count)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!pong_count) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*pong_count = link->pong_count;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_set_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
time_t interval, time_t timeout, unsigned int precision)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
if (!timeout) {
errno = ENOSYS;
return -1;
}
if (!precision) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
link->ping_interval = interval * 1000; /* microseconds */
link->pong_timeout = timeout * 1000; /* microseconds */
link->latency_fix = precision;
link->latency_exp = precision - \
((link->ping_interval * precision) / 8000000);
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u timeout update - interval: %llu timeout: %llu precision: %u",
host_id, link_id, link->ping_interval, link->pong_timeout, precision);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
time_t *interval, time_t *timeout, unsigned int *precision)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
if (!timeout) {
errno = EINVAL;
return -1;
}
if (!precision) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*interval = link->ping_interval / 1000; /* microseconds */
*timeout = link->pong_timeout / 1000;
*precision = link->latency_fix;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_set_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t priority)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
uint8_t old_priority;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
old_priority = link->priority;
if (link->priority == priority) {
err = 0;
goto exit_unlock;
}
link->priority = priority;
if (_host_dstcache_update_sync(knet_h, host)) {
savederrno = errno;
log_debug(knet_h, KNET_SUB_LINK,
"Unable to update link priority (host: %u link: %u priority: %u): %s",
host_id, link_id, link->priority, strerror(savederrno));
link->priority = old_priority;
err = -1;
goto exit_unlock;
}
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u priority set to: %u",
host_id, link_id, link->priority);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *priority)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!priority) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*priority = link->priority;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_link_list(knet_handle_t knet_h, knet_node_id_t host_id,
uint8_t *link_ids, size_t *link_ids_entries)
{
int savederrno = 0, err = 0, i, count = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!link_ids) {
errno = EINVAL;
return -1;
}
if (!link_ids_entries) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
for (i = 0; i < KNET_MAX_LINK; i++) {
link = &host->link[i];
if (!link->configured) {
continue;
}
link_ids[count] = i;
count++;
}
*link_ids_entries = count;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_status(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
struct knet_link_status *status, size_t struct_size)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!status) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
memmove(status, &link->status, struct_size);
/* Calculate totals - no point in doing this on-the-fly */
status->stats.rx_total_packets =
status->stats.rx_data_packets +
status->stats.rx_ping_packets +
status->stats.rx_pong_packets +
status->stats.rx_pmtu_packets;
status->stats.tx_total_packets =
status->stats.tx_data_packets +
status->stats.tx_ping_packets +
status->stats.tx_pong_packets +
status->stats.tx_pmtu_packets;
status->stats.rx_total_bytes =
status->stats.rx_data_bytes +
status->stats.rx_ping_bytes +
status->stats.rx_pong_bytes +
status->stats.rx_pmtu_bytes;
status->stats.tx_total_bytes =
status->stats.tx_data_bytes +
status->stats.tx_ping_bytes +
status->stats.tx_pong_bytes +
status->stats.tx_pmtu_bytes;
status->stats.tx_total_errors =
status->stats.tx_data_errors +
status->stats.tx_ping_errors +
status->stats.tx_pong_errors +
status->stats.tx_pmtu_errors;
status->stats.tx_total_retries =
status->stats.tx_data_retries +
status->stats.tx_ping_retries +
status->stats.tx_pong_retries +
status->stats.tx_pmtu_retries;
/* Tell the caller our full size in case they have an old version */
status->size = sizeof(struct knet_link_status);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_add_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
struct sockaddr_storage *ss1,
struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!ss1) {
errno = EINVAL;
return -1;
}
if ((type != CHECK_TYPE_ADDRESS) &&
(type != CHECK_TYPE_MASK) &&
(type != CHECK_TYPE_RANGE)) {
errno = EINVAL;
return -1;
}
if ((acceptreject != CHECK_ACCEPT) &&
(acceptreject != CHECK_REJECT)) {
errno = EINVAL;
return -1;
}
if ((type != CHECK_TYPE_ADDRESS) && (!ss2)) {
errno = EINVAL;
return -1;
}
if ((type == CHECK_TYPE_RANGE) &&
(ss1->ss_family != ss2->ss_family)) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->dynamic != KNET_LINK_DYNIP) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is a point to point connection: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
err = check_add(knet_h, transport_link_get_acl_fd(knet_h, link), link->transport, -1,
ss1, ss2, type, acceptreject);
savederrno = errno;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_insert_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
int index,
struct sockaddr_storage *ss1,
struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!ss1) {
errno = EINVAL;
return -1;
}
if ((type != CHECK_TYPE_ADDRESS) &&
(type != CHECK_TYPE_MASK) &&
(type != CHECK_TYPE_RANGE)) {
errno = EINVAL;
return -1;
}
if ((acceptreject != CHECK_ACCEPT) &&
(acceptreject != CHECK_REJECT)) {
errno = EINVAL;
return -1;
}
if ((type != CHECK_TYPE_ADDRESS) && (!ss2)) {
errno = EINVAL;
return -1;
}
if ((type == CHECK_TYPE_RANGE) &&
(ss1->ss_family != ss2->ss_family)) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->dynamic != KNET_LINK_DYNIP) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is a point to point connection: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
err = check_add(knet_h, transport_link_get_acl_fd(knet_h, link), link->transport, index,
ss1, ss2, type, acceptreject);
savederrno = errno;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_rm_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
struct sockaddr_storage *ss1,
struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (!ss1) {
errno = EINVAL;
return -1;
}
if ((type != CHECK_TYPE_ADDRESS) &&
(type != CHECK_TYPE_MASK) &&
(type != CHECK_TYPE_RANGE)) {
errno = EINVAL;
return -1;
}
if ((acceptreject != CHECK_ACCEPT) &&
(acceptreject != CHECK_REJECT)) {
errno = EINVAL;
return -1;
}
if ((type != CHECK_TYPE_ADDRESS) && (!ss2)) {
errno = EINVAL;
return -1;
}
if ((type == CHECK_TYPE_RANGE) &&
(ss1->ss_family != ss2->ss_family)) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->dynamic != KNET_LINK_DYNIP) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is a point to point connection: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
err = check_rm(knet_h, transport_link_get_acl_fd(knet_h, link), link->transport,
ss1, ss2, type, acceptreject);
savederrno = errno;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_clear_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->dynamic != KNET_LINK_DYNIP) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is a point to point connection: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
check_rmall(knet_h, transport_link_get_acl_fd(knet_h, link), link->transport);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
diff --git a/libknet/links.h b/libknet/links.h
index e14958de..c8ca6109 100644
--- a/libknet/links.h
+++ b/libknet/links.h
@@ -1,38 +1,48 @@
/*
* Copyright (C) 2012-2019 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#ifndef __KNET_LINK_H__
#define __KNET_LINK_H__
#include "internals.h"
#define KNET_LINK_STATIC 0 /* link has static ip on both ends */
#define KNET_LINK_DYNIP 1 /* link has dynamic destination ip */
/*
* number of iterations to reduce pong_timeout_adj
* from configured(pong_timeout * KNET_LINK_PONG_TIMEOUT_BACKOFF
* to pong_timeout
*/
#define KNET_LINK_PONG_TIMEOUT_BACKOFF 10
/*
* when adjusting link pong_timeout for latency,
* multiply the max recorded latency by this number.
* Yes it's a bit of magic, fairy dust and unicorn farts
* mixed together.
*/
#define KNET_LINK_PONG_TIMEOUT_LAT_MUL 2
+/*
+ * under heavy load with crypto enabled, it takes much
+ * longer time to receive a response from the other node.
+ *
+ * 128 is somewhat arbitrary number but we want to set a limit
+ * and report failures after that.
+ */
+#define KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MIN 2
+#define KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MAX 128
+
int _link_updown(knet_handle_t knet_h, knet_node_id_t node_id, uint8_t link_id,
unsigned int enabled, unsigned int connected);
void _link_clear_stats(knet_handle_t knet_h);
#endif
diff --git a/libknet/threads_pmtud.c b/libknet/threads_pmtud.c
index 1dd17882..d342697e 100644
--- a/libknet/threads_pmtud.c
+++ b/libknet/threads_pmtud.c
@@ -1,542 +1,580 @@
/*
* Copyright (C) 2015-2019 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include "crypto.h"
#include "links.h"
#include "host.h"
#include "logging.h"
#include "transports.h"
#include "threads_common.h"
#include "threads_pmtud.h"
static int _handle_check_link_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link)
{
int err, ret, savederrno, mutex_retry_limit, failsafe, use_kernel_mtu, warn_once;
uint32_t kernel_mtu; /* record kernel_mtu from EMSGSIZE */
size_t onwire_len; /* current packet onwire size */
size_t ipproto_overhead_len; /* onwire packet overhead (protocol based) */
size_t max_mtu_len; /* max mtu for protocol */
size_t data_len; /* how much data we can send in the packet
* generally would be onwire_len - ipproto_overhead_len
* needs to be adjusted for crypto
*/
size_t app_mtu_len; /* real data that we can send onwire */
ssize_t len; /* len of what we were able to sendto onwire */
- struct timespec ts;
- unsigned long long pong_timeout_adj_tmp;
+ struct timespec ts, pmtud_crypto_start_ts, pmtud_crypto_stop_ts;
+ unsigned long long pong_timeout_adj_tmp, timediff;
+ int pmtud_crypto_reduce = 1;
unsigned char *outbuf = (unsigned char *)knet_h->pmtudbuf;
warn_once = 0;
mutex_retry_limit = 0;
failsafe = 0;
knet_h->pmtudbuf->khp_pmtud_link = dst_link->link_id;
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
max_mtu_len = KNET_PMTUD_SIZE_V6;
ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead;
break;
case AF_INET:
max_mtu_len = KNET_PMTUD_SIZE_V4;
ipproto_overhead_len = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead;
break;
default:
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unknown protocol");
return -1;
break;
}
dst_link->last_bad_mtu = 0;
dst_link->last_good_mtu = dst_link->last_ping_size + ipproto_overhead_len;
/*
* discovery starts from the top because kernel will
* refuse to send packets > current iface mtu.
* this saves us some time and network bw.
*/
onwire_len = max_mtu_len;
restart:
/*
* prevent a race when interface mtu is changed _exactly_ during
* the discovery process and it's complex to detect. Easier
* to wait the next loop.
* 30 is not an arbitrary value. To bisect from 576 to 128000 doesn't
* take more than 18/19 steps.
*/
if (failsafe == 30) {
log_err(knet_h, KNET_SUB_PMTUD,
"Aborting PMTUD process: Too many attempts. MTU might have changed during discovery.");
return -1;
} else {
failsafe++;
}
/*
* common to all packets
*/
/*
* calculate the application MTU based on current onwire_len minus ipproto_overhead_len
*/
app_mtu_len = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len);
/*
* recalculate onwire len back that might be different based
* on data padding from crypto layer.
*/
onwire_len = calc_data_outlen(knet_h, app_mtu_len + KNET_HEADER_ALL_SIZE) + ipproto_overhead_len;
/*
* calculate the size of what we need to send to sendto(2).
* see also onwire.c for packet format explanation.
*/
data_len = app_mtu_len + knet_h->sec_hash_size + knet_h->sec_salt_size + KNET_HEADER_ALL_SIZE;
if (knet_h->crypto_instance) {
if (data_len < (knet_h->sec_hash_size + knet_h->sec_salt_size) + 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "Aborting PMTUD process: link mtu smaller than crypto header detected (link might have been disconnected)");
return -1;
}
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)knet_h->pmtudbuf,
data_len - (knet_h->sec_hash_size + knet_h->sec_salt_size),
knet_h->pmtudbuf_crypt,
(ssize_t *)&data_len) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to crypto pmtud packet");
return -1;
}
outbuf = knet_h->pmtudbuf_crypt;
knet_h->stats_extra.tx_crypt_pmtu_packets++;
} else {
knet_h->pmtudbuf->khp_pmtud_size = onwire_len;
}
/* link has gone down, aborting pmtud */
if (dst_link->status.connected != 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id);
return -1;
}
if (dst_link->transport_connected != 1) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD detected host (%u) link (%u) has been disconnected", dst_host->host_id, dst_link->link_id);
return -1;
}
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
return -1;
}
if (knet_h->pmtud_abort) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
errno = EDEADLK;
return -1;
}
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_PMTUD, "Unable to get TX mutex lock: %s", strerror(savederrno));
return -1;
}
retry:
if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &dst_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(dst_link->outsock, outbuf, data_len, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
/*
* we cannot hold a lock on kmtu_mutex between resetting
* knet_h->kernel_mtu here and below where it's used.
* use_kernel_mtu tells us if the knet_h->kernel_mtu was
* set to 0 and we can trust its value later.
*/
use_kernel_mtu = 0;
if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) {
use_kernel_mtu = 1;
knet_h->kernel_mtu = 0;
pthread_mutex_unlock(&knet_h->kmtu_mutex);
}
kernel_mtu = 0;
err = transport_tx_sock_error(knet_h, dst_link->transport, dst_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet (sendto): %d %s", savederrno, strerror(savederrno));
pthread_mutex_unlock(&knet_h->tx_mutex);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
dst_link->status.stats.tx_pmtu_errors++;
return -1;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
dst_link->status.stats.tx_pmtu_retries++;
goto retry;
break;
}
pthread_mutex_unlock(&knet_h->tx_mutex);
if (len != (ssize_t )data_len) {
if (savederrno == EMSGSIZE) {
/*
* we cannot hold a lock on kmtu_mutex between resetting
* knet_h->kernel_mtu and here.
* use_kernel_mtu tells us if the knet_h->kernel_mtu was
* set to 0 previously and we can trust its value now.
*/
if (use_kernel_mtu) {
use_kernel_mtu = 0;
if (pthread_mutex_lock(&knet_h->kmtu_mutex) == 0) {
kernel_mtu = knet_h->kernel_mtu;
pthread_mutex_unlock(&knet_h->kmtu_mutex);
}
}
if (kernel_mtu > 0) {
dst_link->last_bad_mtu = kernel_mtu + 1;
} else {
dst_link->last_bad_mtu = onwire_len;
}
} else {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to send pmtu packet len: %zu err: %s", onwire_len, strerror(savederrno));
}
} else {
dst_link->last_sent_mtu = onwire_len;
dst_link->last_recv_mtu = 0;
dst_link->status.stats.tx_pmtu_packets++;
dst_link->status.stats.tx_pmtu_bytes += data_len;
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return -1;
}
+ /*
+ * non fatal, we can wait the next round to reduce the
+ * multiplier
+ */
+ if (clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_start_ts) < 0) {
+ log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
+ pmtud_crypto_reduce = 0;
+ }
+
/*
* set PMTUd reply timeout to match pong_timeout on a given link
*
* math: internally pong_timeout is expressed in microseconds, while
* the public API exports milliseconds. So careful with the 0's here.
* the loop is necessary because we are grabbing the current time just above
* and add values to it that could overflow into seconds.
*/
if (pthread_mutex_lock(&knet_h->backoff_mutex)) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get backoff_mutex");
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return -1;
}
if (knet_h->crypto_instance) {
/*
* crypto, under pressure, is a royal PITA
*/
- pong_timeout_adj_tmp = dst_link->pong_timeout_adj * 2;
+ pong_timeout_adj_tmp = dst_link->pong_timeout_adj * dst_link->pmtud_crypto_timeout_multiplier;
} else {
pong_timeout_adj_tmp = dst_link->pong_timeout_adj;
}
ts.tv_sec += pong_timeout_adj_tmp / 1000000;
ts.tv_nsec += (((pong_timeout_adj_tmp) % 1000000) * 1000);
while (ts.tv_nsec > 1000000000) {
ts.tv_sec += 1;
ts.tv_nsec -= 1000000000;
}
pthread_mutex_unlock(&knet_h->backoff_mutex);
knet_h->pmtud_waiting = 1;
ret = pthread_cond_timedwait(&knet_h->pmtud_cond, &knet_h->pmtud_mutex, &ts);
knet_h->pmtud_waiting = 0;
if (knet_h->pmtud_abort) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
errno = EDEADLK;
return -1;
}
if (shutdown_in_progress(knet_h)) {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted. shutdown in progress");
return -1;
}
if (ret) {
if (ret == ETIMEDOUT) {
+ if ((knet_h->crypto_instance) && (dst_link->pmtud_crypto_timeout_multiplier < KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MAX)) {
+ dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier * 2;
+ pmtud_crypto_reduce = 0;
+ log_debug(knet_h, KNET_SUB_PMTUD,
+ "Increasing PMTUd response timeout multiplier to (%u) for host %u link: %u",
+ dst_link->pmtud_crypto_timeout_multiplier,
+ dst_host->host_id,
+ dst_link->link_id);
+ pthread_mutex_unlock(&knet_h->pmtud_mutex);
+ goto restart;
+ }
if (!warn_once) {
log_warn(knet_h, KNET_SUB_PMTUD,
"possible MTU misconfiguration detected. "
"kernel is reporting MTU: %u bytes for "
"host %u link %u but the other node is "
"not acknowledging packets of this size. ",
dst_link->last_sent_mtu,
dst_host->host_id,
dst_link->link_id);
log_warn(knet_h, KNET_SUB_PMTUD,
"This can be caused by this node interface MTU "
"too big or a network device that does not "
"support or has been misconfigured to manage MTU "
"of this size, or packet loss. knet will continue "
"to run but performances might be affected.");
warn_once = 1;
}
} else {
pthread_mutex_unlock(&knet_h->pmtud_mutex);
if (mutex_retry_limit == 3) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD aborted, unable to get mutex lock");
return -1;
}
mutex_retry_limit++;
goto restart;
}
}
+ if ((knet_h->crypto_instance) && (pmtud_crypto_reduce == 1) &&
+ (dst_link->pmtud_crypto_timeout_multiplier > KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MIN)) {
+ if (!clock_gettime(CLOCK_MONOTONIC, &pmtud_crypto_stop_ts)) {
+ timespec_diff(pmtud_crypto_start_ts, pmtud_crypto_stop_ts, &timediff);
+ if (((pong_timeout_adj_tmp * 1000) / 2) > timediff) {
+ dst_link->pmtud_crypto_timeout_multiplier = dst_link->pmtud_crypto_timeout_multiplier / 2;
+ log_debug(knet_h, KNET_SUB_PMTUD,
+ "Decreasing PMTUd response timeout multiplier to (%u) for host %u link: %u",
+ dst_link->pmtud_crypto_timeout_multiplier,
+ dst_host->host_id,
+ dst_link->link_id);
+ }
+ } else {
+ log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get current time: %s", strerror(errno));
+ }
+ }
+
if ((dst_link->last_recv_mtu != onwire_len) || (ret)) {
dst_link->last_bad_mtu = onwire_len;
} else {
int found_mtu = 0;
if (knet_h->sec_block_size) {
if ((onwire_len + knet_h->sec_block_size >= max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu <= (onwire_len + knet_h->sec_block_size)))) {
found_mtu = 1;
}
} else {
if ((onwire_len == max_mtu_len) ||
((dst_link->last_bad_mtu) && (dst_link->last_bad_mtu == (onwire_len + 1))) ||
(dst_link->last_bad_mtu == dst_link->last_good_mtu)) {
found_mtu = 1;
}
}
if (found_mtu) {
/*
* account for IP overhead, knet headers and crypto in PMTU calculation
*/
dst_link->status.mtu = calc_max_data_outlen(knet_h, onwire_len - ipproto_overhead_len);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return 0;
}
dst_link->last_good_mtu = onwire_len;
}
}
if (kernel_mtu) {
onwire_len = kernel_mtu;
} else {
onwire_len = (dst_link->last_good_mtu + dst_link->last_bad_mtu) / 2;
}
pthread_mutex_unlock(&knet_h->pmtud_mutex);
goto restart;
}
static int _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int force_run)
{
uint8_t saved_valid_pmtud;
unsigned int saved_pmtud;
struct timespec clock_now;
unsigned long long diff_pmtud, interval;
if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get monotonic clock");
return 0;
}
if (!force_run) {
interval = knet_h->pmtud_interval * 1000000000llu; /* nanoseconds */
timespec_diff(dst_link->pmtud_last, clock_now, &diff_pmtud);
if (diff_pmtud < interval) {
return dst_link->has_valid_mtu;
}
}
/*
* status.proto_overhead should include all IP/(UDP|SCTP)/knet headers
*
* please note that it is not the same as link->proto_overhead that
* includes only either UDP or SCTP (at the moment) overhead.
*/
switch (dst_link->dst_addr.ss_family) {
case AF_INET6:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V6 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size;
break;
case AF_INET:
dst_link->status.proto_overhead = KNET_PMTUD_OVERHEAD_V4 + dst_link->proto_overhead + KNET_HEADER_ALL_SIZE + knet_h->sec_hash_size + knet_h->sec_salt_size;
break;
}
saved_pmtud = dst_link->status.mtu;
saved_valid_pmtud = dst_link->has_valid_mtu;
log_debug(knet_h, KNET_SUB_PMTUD, "Starting PMTUD for host: %u link: %u", dst_host->host_id, dst_link->link_id);
errno = 0;
if (_handle_check_link_pmtud(knet_h, dst_host, dst_link) < 0) {
if (errno == EDEADLK) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD for host: %u link: %u has been rescheduled", dst_host->host_id, dst_link->link_id);
dst_link->status.mtu = saved_pmtud;
dst_link->has_valid_mtu = saved_valid_pmtud;
errno = EDEADLK;
return dst_link->has_valid_mtu;
}
dst_link->has_valid_mtu = 0;
} else {
dst_link->has_valid_mtu = 1;
if (dst_link->has_valid_mtu) {
if ((saved_pmtud) && (saved_pmtud != dst_link->status.mtu)) {
log_info(knet_h, KNET_SUB_PMTUD, "PMTUD link change for host: %u link: %u from %u to %u",
dst_host->host_id, dst_link->link_id, saved_pmtud, dst_link->status.mtu);
}
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUD completed for host: %u link: %u current link mtu: %u",
dst_host->host_id, dst_link->link_id, dst_link->status.mtu);
/*
* set pmtud_last, if we can, after we are done with the PMTUd process
* because it can take a very long time.
*/
dst_link->pmtud_last = clock_now;
if (!clock_gettime(CLOCK_MONOTONIC, &clock_now)) {
dst_link->pmtud_last = clock_now;
}
}
}
if (saved_valid_pmtud != dst_link->has_valid_mtu) {
_host_dstcache_update_sync(knet_h, dst_host);
}
return dst_link->has_valid_mtu;
}
void *_handle_pmtud_link_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct knet_host *dst_host;
struct knet_link *dst_link;
int link_idx;
unsigned int have_mtu;
unsigned int lower_mtu;
int link_has_mtu;
int force_run = 0;
set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STARTED);
knet_h->data_mtu = calc_min_mtu(knet_h);
/* preparing pmtu buffer */
knet_h->pmtudbuf->kh_version = KNET_HEADER_VERSION;
knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD;
knet_h->pmtudbuf->kh_node = htons(knet_h->host_id);
while (!shutdown_in_progress(knet_h)) {
usleep(KNET_THREADS_TIMERES);
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
continue;
}
knet_h->pmtud_abort = 0;
knet_h->pmtud_running = 1;
force_run = knet_h->pmtud_forcerun;
knet_h->pmtud_forcerun = 0;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
if (force_run) {
log_debug(knet_h, KNET_SUB_PMTUD, "PMTUd request to rerun has been received");
}
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get read lock");
continue;
}
lower_mtu = KNET_PMTUD_SIZE_V4;
have_mtu = 0;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
dst_link = &dst_host->link[link_idx];
if ((dst_link->status.enabled != 1) ||
(dst_link->status.connected != 1) ||
(dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK) ||
(!dst_link->last_ping_size) ||
((dst_link->dynamic == KNET_LINK_DYNIP) &&
(dst_link->status.dynconnected != 1)))
continue;
link_has_mtu = _handle_check_pmtud(knet_h, dst_host, dst_link, force_run);
if (errno == EDEADLK) {
goto out_unlock;
}
if (link_has_mtu) {
have_mtu = 1;
if (dst_link->status.mtu < lower_mtu) {
lower_mtu = dst_link->status.mtu;
}
}
}
}
if (have_mtu) {
if (knet_h->data_mtu != lower_mtu) {
knet_h->data_mtu = lower_mtu;
log_info(knet_h, KNET_SUB_PMTUD, "Global data MTU changed to: %u", knet_h->data_mtu);
if (knet_h->pmtud_notify_fn) {
knet_h->pmtud_notify_fn(knet_h->pmtud_notify_fn_private_data,
knet_h->data_mtu);
}
}
}
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_PMTUD, "Unable to get mutex lock");
} else {
knet_h->pmtud_running = 0;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
}
}
set_thread_status(knet_h, KNET_THREAD_PMTUD, KNET_THREAD_STOPPED);
return NULL;
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Feb 24, 11:29 AM (19 h, 8 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464167
Default Alt Text
(74 KB)
Attached To
Mode
rK kronosnet
Attached
Detach File
Event Timeline
Log In to Comment