Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/libknet/internals.h b/libknet/internals.h
index 7527c3fd..48f25949 100644
--- a/libknet/internals.h
+++ b/libknet/internals.h
@@ -1,437 +1,437 @@
/*
* Copyright (C) 2010-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#ifndef __KNET_INTERNALS_H__
#define __KNET_INTERNALS_H__
/*
* NOTE: you shouldn't need to include this header normally
*/
#include <pthread.h>
#include <stddef.h>
#include <qb/qblist.h>
#include "libknet.h"
#include "onwire.h"
#include "compat.h"
#include "threads_common.h"
#define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE
#define KNET_DATABUFSIZE_CRYPT_PAD 1024
#define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD
#define KNET_DATABUFSIZE_COMPRESS_PAD 1024
#define KNET_DATABUFSIZE_COMPRESS KNET_DATABUFSIZE + KNET_DATABUFSIZE_COMPRESS_PAD
#define KNET_RING_RCVBUFF 8388608
#define PCKT_FRAG_MAX UINT8_MAX
#define PCKT_RX_BUFS 512
#define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX + 1
/*
* Size of threads stack. Value is choosen by experimenting, how much is needed
* to sucesfully finish test suite, and at the time of writing patch it was
* ~300KiB. To have some room for future enhancement it is increased
* by factor of 3 and rounded.
*/
#define KNET_THREAD_STACK_SIZE (1024 * 1024)
typedef void *knet_transport_link_t; /* per link transport handle */
typedef void *knet_transport_t; /* per knet_h transport handle */
struct knet_transport_ops; /* Forward because of circular dependancy */
struct knet_mmsghdr {
struct msghdr msg_hdr; /* Message header */
unsigned int msg_len; /* Number of bytes transmitted */
};
struct knet_link {
/* required */
struct sockaddr_storage src_addr;
struct sockaddr_storage dst_addr;
/* configurable */
unsigned int dynamic; /* see KNET_LINK_DYN_ define above */
uint8_t priority; /* higher priority == preferred for A/P */
unsigned long long ping_interval; /* interval */
unsigned long long pong_timeout; /* timeout */
unsigned long long pong_timeout_adj; /* timeout adjusted for latency */
uint8_t pong_timeout_backoff; /* see link.h for definition */
unsigned int latency_max_samples; /* precision */
unsigned int latency_cur_samples;
uint8_t pong_count; /* how many ping/pong to send/receive before link is up */
uint64_t flags;
+ void *access_list_match_entry_head; /* pointer to access list match_entry list head */
/* status */
struct knet_link_status status;
/* internals */
pthread_mutex_t link_stats_mutex; /* used to update link stats */
uint8_t link_id;
uint8_t transport; /* #defined constant from API */
knet_transport_link_t transport_link; /* link_info_t from transport */
int outsock;
unsigned int configured:1; /* set to 1 if src/dst have been configured transport initialized on this link*/
unsigned int transport_connected:1; /* set to 1 if lower level transport is connected */
uint8_t received_pong;
struct timespec ping_last;
/* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */
uint32_t proto_overhead; /* IP + UDP/SCTP overhead. NOT to be confused
with stats.proto_overhead that includes also knet headers
and crypto headers */
struct timespec pmtud_last;
uint32_t last_ping_size;
uint32_t last_good_mtu;
uint32_t last_bad_mtu;
uint32_t last_sent_mtu;
uint32_t last_recv_mtu;
uint32_t pmtud_crypto_timeout_multiplier;/* used by PMTUd to adjust timeouts on high loads */
uint8_t has_valid_mtu;
};
#define KNET_DEFRAG_BUFFERS 32
#define KNET_CBUFFER_SIZE 4096
struct knet_host_defrag_buf {
char buf[KNET_DATABUFSIZE];
uint8_t in_use; /* 0 buffer is free, 1 is in use */
seq_num_t pckt_seq; /* identify the pckt we are receiving */
uint8_t frag_recv; /* how many frags did we receive */
uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */
uint8_t last_first; /* special case if we receive the last fragment first */
ssize_t frag_size; /* normal frag size (not the last one) */
ssize_t last_frag_size; /* the last fragment might not be aligned with MTU size */
struct timespec last_update; /* keep time of the last pckt */
};
struct knet_host {
/* required */
knet_node_id_t host_id;
/* configurable */
uint8_t link_handler_policy;
char name[KNET_MAX_HOST_LEN];
/* status */
struct knet_host_status status;
/* internals */
char circular_buffer[KNET_CBUFFER_SIZE];
seq_num_t rx_seq_num;
seq_num_t untimed_rx_seq_num;
seq_num_t timed_rx_seq_num;
uint8_t got_data;
/* defrag/reassembly buffers */
struct knet_host_defrag_buf defrag_buf[KNET_DEFRAG_BUFFERS];
char circular_buffer_defrag[KNET_CBUFFER_SIZE];
/* link stuff */
struct knet_link link[KNET_MAX_LINK];
uint8_t active_link_entries;
uint8_t active_links[KNET_MAX_LINK];
struct knet_host *next;
};
struct knet_sock {
int sockfd[2]; /* sockfd[0] will always be application facing
* and sockfd[1] internal if sockpair has been created by knet */
int is_socket; /* check if it's a socket for recvmmsg usage */
int is_created; /* knet created this socket and has to clean up on exit/del */
int in_use; /* set to 1 if it's use, 0 if free */
int has_error; /* set to 1 if there were errors reading from the sock
* and socket has been removed from epoll */
};
struct knet_fd_trackers {
uint8_t transport; /* transport type (UDP/SCTP...) */
uint8_t data_type; /* internal use for transport to define what data are associated
* with this fd */
socklen_t sockaddr_len; /* Size of sockaddr_in[6] structure for this socket */
void *data; /* pointer to the data */
- void *access_list_match_entry_head; /* pointer to access list match_entry list head */
};
#define KNET_MAX_FDS KNET_MAX_HOST * KNET_MAX_LINK * 4
#define KNET_MAX_COMPRESS_METHODS UINT8_MAX
#define KNET_MAX_CRYPTO_INSTANCES 2
struct knet_handle_stats_extra {
uint64_t tx_crypt_pmtu_packets;
uint64_t tx_crypt_pmtu_reply_packets;
uint64_t tx_crypt_ping_packets;
uint64_t tx_crypt_pong_packets;
};
struct knet_handle {
knet_node_id_t host_id;
unsigned int enabled:1;
struct knet_sock sockfd[KNET_DATAFD_MAX + 1];
int logfd;
uint8_t log_levels[KNET_MAX_SUBSYSTEMS];
int dstsockfd[2];
int send_to_links_epollfd;
int recv_from_links_epollfd;
int dst_link_handler_epollfd;
uint8_t use_access_lists; /* set to 0 for disable, 1 for enable */
unsigned int pmtud_interval;
unsigned int manual_mtu;
unsigned int data_mtu; /* contains the max data size that we can send onwire
* without frags */
struct knet_host *host_head;
struct knet_host *host_index[KNET_MAX_HOST];
knet_transport_t transports[KNET_MAX_TRANSPORTS+1];
struct knet_fd_trackers knet_transport_fd_tracker[KNET_MAX_FDS]; /* track status for each fd handled by transports */
struct knet_handle_stats stats;
struct knet_handle_stats_extra stats_extra;
pthread_mutex_t handle_stats_mutex; /* used to protect handle stats */
uint32_t reconnect_int;
knet_node_id_t host_ids[KNET_MAX_HOST];
size_t host_ids_entries;
struct knet_header *recv_from_sock_buf;
struct knet_header *send_to_links_buf[PCKT_FRAG_MAX];
struct knet_header *recv_from_links_buf[PCKT_RX_BUFS];
struct knet_header *pingbuf;
struct knet_header *pmtudbuf;
uint8_t threads_status[KNET_THREAD_MAX];
uint8_t threads_flush_queue[KNET_THREAD_MAX];
pthread_mutex_t threads_status_mutex;
pthread_t send_to_links_thread;
pthread_t recv_from_links_thread;
pthread_t heartbt_thread;
pthread_t dst_link_handler_thread;
pthread_t pmtud_link_handler_thread;
pthread_rwlock_t global_rwlock; /* global config lock */
pthread_mutex_t pmtud_mutex; /* pmtud mutex to handle conditional send/recv + timeout */
pthread_cond_t pmtud_cond; /* conditional for above */
pthread_mutex_t tx_mutex; /* used to protect knet_send_sync and TX thread */
pthread_mutex_t hb_mutex; /* used to protect heartbeat thread and seq_num broadcasting */
pthread_mutex_t backoff_mutex; /* used to protect dst_link->pong_timeout_adj */
pthread_mutex_t kmtu_mutex; /* used to protect kernel_mtu */
uint32_t kernel_mtu; /* contains the MTU detected by the kernel on a given link */
int pmtud_waiting;
int pmtud_running;
int pmtud_forcerun;
int pmtud_abort;
struct crypto_instance *crypto_instance[KNET_MAX_CRYPTO_INSTANCES + 1]; /* store an extra pointer to allow 0|1|2 values without too much magic in the code */
uint8_t crypto_in_use_config; /* crypto config to use for TX */
uint8_t crypto_only; /* allow only crypto (1) or also clear (0) traffic */
size_t sec_block_size;
size_t sec_hash_size;
size_t sec_salt_size;
unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX];
unsigned char *recv_from_links_buf_crypt;
unsigned char *recv_from_links_buf_decrypt;
unsigned char *pingbuf_crypt;
unsigned char *pmtudbuf_crypt;
int compress_model;
int compress_level;
size_t compress_threshold;
void *compress_int_data[KNET_MAX_COMPRESS_METHODS]; /* for compress method private data */
unsigned char *recv_from_links_buf_decompress;
unsigned char *send_to_links_buf_compress;
seq_num_t tx_seq_num;
pthread_mutex_t tx_seq_num_mutex;
uint8_t has_loop_link;
uint8_t loop_link;
void *dst_host_filter_fn_private_data;
int (*dst_host_filter_fn) (
void *private_data,
const unsigned char *outdata,
ssize_t outdata_len,
uint8_t tx_rx,
knet_node_id_t this_host_id,
knet_node_id_t src_node_id,
int8_t *channel,
knet_node_id_t *dst_host_ids,
size_t *dst_host_ids_entries);
void *pmtud_notify_fn_private_data;
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu);
void *host_status_change_notify_fn_private_data;
void (*host_status_change_notify_fn) (
void *private_data,
knet_node_id_t host_id,
uint8_t reachable,
uint8_t remote,
uint8_t external);
void *sock_notify_fn_private_data;
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno);
int fini_in_progress;
uint64_t flags;
struct qb_list_head list;
const char *plugin_path;
};
struct handle_tracker {
struct qb_list_head head;
};
/*
* lib_config stuff shared across everything
*/
extern pthread_rwlock_t shlib_rwlock; /* global shared lib load lock */
extern pthread_mutex_t handle_config_mutex;
extern struct handle_tracker handle_list;
extern uint8_t handle_list_init;
int _is_valid_handle(knet_handle_t knet_h);
int _init_shlib_tracker(knet_handle_t knet_h);
void _fini_shlib_tracker(void);
/*
* NOTE: every single operation must be implementend
* for every protocol.
*/
/*
* for now knet supports only IP protocols (udp/sctp)
* in future there might be others like ARP
* or TIPC.
* keep this around as transport information
* to use for access lists and other operations
*/
#define TRANSPORT_PROTO_LOOPBACK 0
#define TRANSPORT_PROTO_IP_PROTO 1
/*
* some transports like SCTP can filter incoming
* connections before knet has to process
* any packets.
* GENERIC_ACL -> packet has to be read and filterted
* PROTO_ACL -> transport provides filtering at lower levels
* and packet does not need to be processed
*/
typedef enum {
USE_NO_ACL,
USE_GENERIC_ACL,
USE_PROTO_ACL
} transport_acl;
/*
* make it easier to map values in transports.c
*/
#define TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED 0
#define TRANSPORT_PROTO_IS_CONNECTION_ORIENTED 1
typedef struct knet_transport_ops {
/*
* transport generic information
*/
const char *transport_name;
const uint8_t transport_id;
const uint8_t built_in;
uint8_t transport_protocol;
transport_acl transport_acl_type;
/*
* connection oriented protocols like SCTP
* don´t need dst_addr in sendto calls and
* on some OSes are considered EINVAL.
*/
uint8_t transport_is_connection_oriented;
uint32_t transport_mtu_overhead;
/*
* transport init must allocate the new transport
* and perform all internal initializations
* (threads, lists, etc).
*/
int (*transport_init)(knet_handle_t knet_h);
/*
* transport free must releases _all_ resources
* allocated by tranport_init
*/
int (*transport_free)(knet_handle_t knet_h);
/*
* link operations should take care of all the
* sockets and epoll management for a given link/transport set
* transport_link_disable should return err = -1 and errno = EBUSY
* if listener is still in use, and any other errno in case
* the link cannot be disabled.
*
* set_config/clear_config are invoked in global write lock context
*/
int (*transport_link_set_config)(knet_handle_t knet_h, struct knet_link *link);
int (*transport_link_clear_config)(knet_handle_t knet_h, struct knet_link *link);
/*
* transport callback for incoming dynamic connections
* this is called in global read lock context
*/
int (*transport_link_dyn_connect)(knet_handle_t knet_h, int sockfd, struct knet_link *link);
/*
* return the fd to use for access lists
*/
int (*transport_link_get_acl_fd)(knet_handle_t knet_h, struct knet_link *link);
/*
* per transport error handling of recvmmsg
* (see _handle_recv_from_links comments for details)
*/
/*
* transport_rx_sock_error is invoked when recvmmsg returns <= 0
*
* transport_rx_sock_error is invoked with both global_rdlock
*/
int (*transport_rx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno);
/*
* transport_tx_sock_error is invoked with global_rwlock and
* it's invoked when sendto or sendmmsg returns =< 0
*
* it should return:
* -1 on internal error
* 0 ignore error and continue
* 1 retry
* any sleep or wait action should happen inside the transport code
*/
int (*transport_tx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno);
/*
* this function is called on _every_ received packet
* to verify if the packet is data or internal protocol error handling
*
* it should return:
* -1 on error
* 0 packet is not data and we should continue the packet process loop
* 1 packet is not data and we should STOP the packet process loop
* 2 packet is data and should be parsed as such
*
* transport_rx_is_data is invoked with both global_rwlock
* and fd_tracker read lock (from RX thread)
*/
int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg);
/*
* this function is called by links.c when a link down event is recorded
* to notify the transport that packets are not going through, and give
* transport the opportunity to take actions.
*/
int (*transport_link_is_down)(knet_handle_t knet_h, struct knet_link *link);
} knet_transport_ops_t;
struct pretty_names {
const char *name;
uint8_t val;
};
#endif
diff --git a/libknet/links.c b/libknet/links.c
index 2e41bdb1..8cb1621b 100644
--- a/libknet/links.c
+++ b/libknet/links.c
@@ -1,1513 +1,1513 @@
/*
* Copyright (C) 2012-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <netdb.h>
#include <string.h>
#include <pthread.h>
#include "netutils.h"
#include "internals.h"
#include "logging.h"
#include "links.h"
#include "transports.h"
#include "host.h"
#include "threads_common.h"
#include "links_acl.h"
int _link_updown(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int enabled, unsigned int connected, unsigned int lock_stats)
{
struct knet_host *host = knet_h->host_index[host_id];
struct knet_link *link = &host->link[link_id];
int savederrno = 0;
if ((link->status.enabled == enabled) &&
(link->status.connected == connected))
return 0;
link->status.enabled = enabled;
link->status.connected = connected;
_host_dstcache_update_async(knet_h, knet_h->host_index[host_id]);
if ((link->status.dynconnected) &&
(!link->status.connected)) {
link->status.dynconnected = 0;
}
if (!connected) {
transport_link_is_down(knet_h, link);
}
if (lock_stats) {
savederrno = pthread_mutex_lock(&link->link_stats_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get stats mutex lock for host %u link %u: %s",
host_id, link_id, strerror(savederrno));
errno = savederrno;
return -1;
}
}
if (connected) {
time(&link->status.stats.last_up_times[link->status.stats.last_up_time_index]);
link->status.stats.up_count++;
if (++link->status.stats.last_up_time_index >= MAX_LINK_EVENTS) {
link->status.stats.last_up_time_index = 0;
}
} else {
time(&link->status.stats.last_down_times[link->status.stats.last_down_time_index]);
link->status.stats.down_count++;
if (++link->status.stats.last_down_time_index >= MAX_LINK_EVENTS) {
link->status.stats.last_down_time_index = 0;
}
}
if (lock_stats) {
pthread_mutex_unlock(&link->link_stats_mutex);
}
return 0;
}
void _link_clear_stats(knet_handle_t knet_h)
{
struct knet_host *host;
struct knet_link *link;
uint32_t host_id;
uint8_t link_id;
for (host_id = 0; host_id < KNET_MAX_HOST; host_id++) {
host = knet_h->host_index[host_id];
if (!host) {
continue;
}
for (link_id = 0; link_id < KNET_MAX_LINK; link_id++) {
link = &host->link[link_id];
memset(&link->status.stats, 0, sizeof(struct knet_link_stats));
}
}
}
int knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t transport,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr,
uint64_t flags)
{
int savederrno = 0, err = 0, i, wipelink = 0, link_idx;
struct knet_host *host, *tmp_host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!src_addr) {
errno = EINVAL;
return -1;
}
if (dst_addr && (src_addr->ss_family != dst_addr->ss_family)) {
log_err(knet_h, KNET_SUB_LINK, "Source address family does not match destination address family");
errno = EINVAL;
return -1;
}
if (transport >= KNET_MAX_TRANSPORTS) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id != host_id) {
log_err(knet_h, KNET_SUB_LINK, "Cannot create loopback link to remote node");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
if (knet_h->host_id == host_id && knet_h->has_loop_link) {
log_err(knet_h, KNET_SUB_LINK, "Cannot create more than 1 link when loopback is active");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id == host_id) {
for (i=0; i<KNET_MAX_LINK; i++) {
if (host->link[i].configured) {
log_err(knet_h, KNET_SUB_LINK, "Cannot add loopback link when other links are already configured.");
err = -1;
savederrno = EINVAL;
goto exit_unlock;
}
}
}
link = &host->link[link_id];
if (link->configured != 0) {
err =-1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled != 0) {
err =-1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
/*
* errors happening after this point should trigger
* a memset of the link
*/
wipelink = 1;
copy_sockaddr(&link->src_addr, src_addr);
err = knet_addrtostr(src_addr, sizeof(struct sockaddr_storage),
link->status.src_ipaddr, KNET_MAX_HOST_LEN,
link->status.src_port, KNET_MAX_PORT_LEN);
if (err) {
if (err == EAI_SYSTEM) {
savederrno = errno;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u source addr/port: %s",
host_id, link_id, strerror(savederrno));
} else {
savederrno = EINVAL;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u source addr/port: %s",
host_id, link_id, gai_strerror(err));
}
err = -1;
goto exit_unlock;
}
if (!dst_addr) {
link->dynamic = KNET_LINK_DYNIP;
} else {
link->dynamic = KNET_LINK_STATIC;
copy_sockaddr(&link->dst_addr, dst_addr);
err = knet_addrtostr(dst_addr, sizeof(struct sockaddr_storage),
link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
link->status.dst_port, KNET_MAX_PORT_LEN);
if (err) {
if (err == EAI_SYSTEM) {
savederrno = errno;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u destination addr/port: %s",
host_id, link_id, strerror(savederrno));
} else {
savederrno = EINVAL;
log_warn(knet_h, KNET_SUB_LINK,
"Unable to resolve host: %u link: %u destination addr/port: %s",
host_id, link_id, gai_strerror(err));
}
err = -1;
goto exit_unlock;
}
}
link->pmtud_crypto_timeout_multiplier = KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MIN;
link->pong_count = KNET_LINK_DEFAULT_PONG_COUNT;
link->has_valid_mtu = 0;
link->ping_interval = KNET_LINK_DEFAULT_PING_INTERVAL * 1000; /* microseconds */
link->pong_timeout = KNET_LINK_DEFAULT_PING_TIMEOUT * 1000; /* microseconds */
link->pong_timeout_backoff = KNET_LINK_PONG_TIMEOUT_BACKOFF;
link->pong_timeout_adj = link->pong_timeout * link->pong_timeout_backoff; /* microseconds */
link->latency_max_samples = KNET_LINK_DEFAULT_PING_PRECISION;
link->latency_cur_samples = 0;
link->flags = flags;
/*
* check for DYNIP vs STATIC collisions.
* example: link0 is static, user attempts to configure link1 as dynamic with the same source
* address/port.
* This configuration is invalid and would cause ACL collisions.
*/
for (tmp_host = knet_h->host_head; tmp_host != NULL; tmp_host = tmp_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (&tmp_host->link[link_idx] == link)
continue;
if ((!memcmp(&tmp_host->link[link_idx].src_addr, &link->src_addr, sizeof(struct sockaddr_storage))) &&
(tmp_host->link[link_idx].dynamic != link->dynamic)) {
savederrno = EINVAL;
err = -1;
log_err(knet_h, KNET_SUB_LINK, "Failed to configure host %u link %u dyn %u. Conflicts with host %u link %u dyn %u: %s",
host_id, link_id, link->dynamic, tmp_host->host_id, link_idx, tmp_host->link[link_idx].dynamic, strerror(savederrno));
goto exit_unlock;
}
}
}
savederrno = pthread_mutex_init(&link->link_stats_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to initialize link stats mutex: %s", strerror(savederrno));
err = -1;
goto exit_unlock;
}
if (transport_link_set_config(knet_h, link, transport) < 0) {
savederrno = errno;
err = -1;
goto exit_transport_err;
}
/*
* we can only configure default access lists if we know both endpoints
* and the protocol uses GENERIC_ACL, otherwise the protocol has
* to setup their own access lists above in transport_link_set_config.
*/
if ((transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL) &&
(link->dynamic == KNET_LINK_STATIC)) {
log_debug(knet_h, KNET_SUB_LINK, "Configuring default access lists for host: %u link: %u socket: %d",
host_id, link_id, link->outsock);
- if ((check_add(knet_h, link->outsock, transport, -1,
+ if ((check_add(knet_h, link, -1,
&link->dst_addr, &link->dst_addr,
CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) {
log_warn(knet_h, KNET_SUB_LINK, "Failed to configure default access lists for host: %u link: %u", host_id, link_id);
savederrno = errno;
err = -1;
goto exit_acl_error;
}
}
/*
* no errors should happen after link is configured
*/
link->configured = 1;
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is configured",
host_id, link_id);
if (transport == KNET_TRANSPORT_LOOPBACK) {
knet_h->has_loop_link = 1;
knet_h->loop_link = link_id;
host->status.reachable = 1;
link->status.mtu = KNET_PMTUD_SIZE_V6;
} else {
/*
* calculate the minimum MTU that is safe to use,
* based on RFCs and that each network device should
* be able to support without any troubles
*/
if (link->dynamic == KNET_LINK_STATIC) {
/*
* with static link we can be more precise than using
* the generic calc_min_mtu()
*/
switch (link->dst_addr.ss_family) {
case AF_INET6:
link->status.mtu = calc_max_data_outlen(knet_h, KNET_PMTUD_MIN_MTU_V6 - (KNET_PMTUD_OVERHEAD_V6 + link->proto_overhead));
break;
case AF_INET:
link->status.mtu = calc_max_data_outlen(knet_h, KNET_PMTUD_MIN_MTU_V4 - (KNET_PMTUD_OVERHEAD_V4 + link->proto_overhead));
break;
}
} else {
/*
* for dynamic links we start with the minimum MTU
* possible and PMTUd will kick in immediately
* after connection status is 1
*/
link->status.mtu = calc_min_mtu(knet_h);
}
link->has_valid_mtu = 1;
}
exit_acl_error:
/*
* if creating access lists has error, we only need to clean
* the transport and the stuff below.
*/
if (err < 0) {
if ((transport_link_clear_config(knet_h, link) < 0) &&
(errno != EBUSY)) {
log_warn(knet_h, KNET_SUB_LINK, "Failed to deconfigure transport for host %u link %u: %s", host_id, link_id, strerror(errno));
}
}
exit_transport_err:
/*
* if transport has errors, transport will clean after itself
* and we only need to clean the mutex
*/
if (err < 0) {
pthread_mutex_destroy(&link->link_stats_mutex);
}
exit_unlock:
/*
* re-init the link on error
*/
if ((err < 0) && (wipelink)) {
memset(link, 0, sizeof(struct knet_link));
link->link_id = link_id;
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *transport,
struct sockaddr_storage *src_addr,
struct sockaddr_storage *dst_addr,
uint8_t *dynamic,
uint64_t *flags)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!src_addr) {
errno = EINVAL;
return -1;
}
if (!dynamic) {
errno = EINVAL;
return -1;
}
if (!transport) {
errno = EINVAL;
return -1;
}
if (!flags) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if ((link->dynamic == KNET_LINK_STATIC) && (!dst_addr)) {
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}
memmove(src_addr, &link->src_addr, sizeof(struct sockaddr_storage));
*transport = link->transport;
*flags = link->flags;
if (link->dynamic == KNET_LINK_STATIC) {
*dynamic = 0;
memmove(dst_addr, &link->dst_addr, sizeof(struct sockaddr_storage));
} else {
*dynamic = 1;
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_clear_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
int sock;
uint8_t transport;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (link->configured != 1) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled != 0) {
err = -1;
savederrno = EBUSY;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
/*
* remove well known access lists here.
* After the transport has done clearing the config,
* then we can remove any leftover access lists if the link
* is no longer in use.
*/
if ((transport_get_acl_type(knet_h, link->transport) == USE_GENERIC_ACL) &&
(link->dynamic == KNET_LINK_STATIC)) {
- if ((check_rm(knet_h, link->outsock, link->transport,
+ if ((check_rm(knet_h, link,
&link->dst_addr, &link->dst_addr,
CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != ENOENT)) {
err = -1;
savederrno = errno;
log_err(knet_h, KNET_SUB_LINK, "Host %u link %u: unable to remove default access list",
host_id, link_id);
goto exit_unlock;
}
}
/*
* cache it for later as we don't know if the transport
* will clear link info during clear_config.
*/
sock = link->outsock;
transport = link->transport;
if ((transport_link_clear_config(knet_h, link) < 0) &&
(errno != EBUSY)) {
savederrno = errno;
err = -1;
goto exit_unlock;
}
/*
* remove any other access lists when the socket is no
* longer in use by the transport.
*/
if ((transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL) &&
(knet_h->knet_transport_fd_tracker[sock].transport == KNET_MAX_TRANSPORTS)) {
- check_rmall(knet_h, sock, transport);
+ check_rmall(knet_h, link);
}
pthread_mutex_destroy(&link->link_stats_mutex);
memset(link, 0, sizeof(struct knet_link));
link->link_id = link_id;
if (knet_h->has_loop_link && host_id == knet_h->host_id && link_id == knet_h->loop_link) {
knet_h->has_loop_link = 0;
if (host->active_link_entries == 0) {
host->status.reachable = 0;
}
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u config has been wiped",
host_id, link_id);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_set_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int enabled)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (enabled > 1) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->status.enabled == enabled) {
err = 0;
goto exit_unlock;
}
err = _link_updown(knet_h, host_id, link_id, enabled, link->status.connected, 0);
savederrno = errno;
if (enabled) {
goto exit_unlock;
}
log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is disabled",
host_id, link_id);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
unsigned int *enabled)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!enabled) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*enabled = link->status.enabled;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_set_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t pong_count)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (pong_count < 1) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
link->pong_count = pong_count;
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u pong count update: %u",
host_id, link_id, link->pong_count);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *pong_count)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!pong_count) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*pong_count = link->pong_count;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_set_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
time_t interval, time_t timeout, unsigned int precision)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
if (!timeout) {
errno = ENOSYS;
return -1;
}
if (!precision) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
link->ping_interval = interval * 1000; /* microseconds */
link->pong_timeout = timeout * 1000; /* microseconds */
link->latency_max_samples = precision;
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u timeout update - interval: %llu timeout: %llu precision: %u",
host_id, link_id, link->ping_interval, link->pong_timeout, precision);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
time_t *interval, time_t *timeout, unsigned int *precision)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!interval) {
errno = EINVAL;
return -1;
}
if (!timeout) {
errno = EINVAL;
return -1;
}
if (!precision) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*interval = link->ping_interval / 1000; /* microseconds */
*timeout = link->pong_timeout / 1000;
*precision = link->latency_max_samples;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_set_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t priority)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
uint8_t old_priority;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
old_priority = link->priority;
if (link->priority == priority) {
err = 0;
goto exit_unlock;
}
link->priority = priority;
if (_host_dstcache_update_sync(knet_h, host)) {
savederrno = errno;
log_debug(knet_h, KNET_SUB_LINK,
"Unable to update link priority (host: %u link: %u priority: %u): %s",
host_id, link_id, link->priority, strerror(savederrno));
link->priority = old_priority;
err = -1;
goto exit_unlock;
}
log_debug(knet_h, KNET_SUB_LINK,
"host: %u link: %u priority set to: %u",
host_id, link_id, link->priority);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t *priority)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!priority) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
*priority = link->priority;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_link_list(knet_handle_t knet_h, knet_node_id_t host_id,
uint8_t *link_ids, size_t *link_ids_entries)
{
int savederrno = 0, err = 0, i, count = 0;
struct knet_host *host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (!link_ids) {
errno = EINVAL;
return -1;
}
if (!link_ids_entries) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
for (i = 0; i < KNET_MAX_LINK; i++) {
link = &host->link[i];
if (!link->configured) {
continue;
}
link_ids[count] = i;
count++;
}
*link_ids_entries = count;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_get_status(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
struct knet_link_status *status, size_t struct_size)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
if (!status) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
savederrno = pthread_mutex_lock(&link->link_stats_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get stats mutex lock for host %u link %u: %s",
host_id, link_id, strerror(savederrno));
err = -1;
goto exit_unlock;
}
memmove(status, &link->status, struct_size);
pthread_mutex_unlock(&link->link_stats_mutex);
/* Calculate totals - no point in doing this on-the-fly */
status->stats.rx_total_packets =
status->stats.rx_data_packets +
status->stats.rx_ping_packets +
status->stats.rx_pong_packets +
status->stats.rx_pmtu_packets;
status->stats.tx_total_packets =
status->stats.tx_data_packets +
status->stats.tx_ping_packets +
status->stats.tx_pong_packets +
status->stats.tx_pmtu_packets;
status->stats.rx_total_bytes =
status->stats.rx_data_bytes +
status->stats.rx_ping_bytes +
status->stats.rx_pong_bytes +
status->stats.rx_pmtu_bytes;
status->stats.tx_total_bytes =
status->stats.tx_data_bytes +
status->stats.tx_ping_bytes +
status->stats.tx_pong_bytes +
status->stats.tx_pmtu_bytes;
status->stats.tx_total_errors =
status->stats.tx_data_errors +
status->stats.tx_ping_errors +
status->stats.tx_pong_errors +
status->stats.tx_pmtu_errors;
status->stats.tx_total_retries =
status->stats.tx_data_retries +
status->stats.tx_ping_retries +
status->stats.tx_pong_retries +
status->stats.tx_pmtu_retries;
/* Tell the caller our full size in case they have an old version */
status->size = sizeof(struct knet_link_status);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
int knet_link_insert_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
int index,
struct sockaddr_storage *ss1,
struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (!ss1) {
errno = EINVAL;
return -1;
}
if ((type != CHECK_TYPE_ADDRESS) &&
(type != CHECK_TYPE_MASK) &&
(type != CHECK_TYPE_RANGE)) {
errno = EINVAL;
return -1;
}
if ((acceptreject != CHECK_ACCEPT) &&
(acceptreject != CHECK_REJECT)) {
errno = EINVAL;
return -1;
}
if ((type != CHECK_TYPE_ADDRESS) && (!ss2)) {
errno = EINVAL;
return -1;
}
if ((type == CHECK_TYPE_RANGE) &&
(ss1->ss_family != ss2->ss_family)) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->dynamic != KNET_LINK_DYNIP) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is a point to point connection: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
- err = check_add(knet_h, transport_link_get_acl_fd(knet_h, link), link->transport, index,
+ err = check_add(knet_h, link, index,
ss1, ss2, type, acceptreject);
savederrno = errno;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_add_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
struct sockaddr_storage *ss1,
struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
return knet_link_insert_acl(knet_h, host_id, link_id, -1, ss1, ss2, type, acceptreject);
}
int knet_link_rm_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
struct sockaddr_storage *ss1,
struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (!ss1) {
errno = EINVAL;
return -1;
}
if ((type != CHECK_TYPE_ADDRESS) &&
(type != CHECK_TYPE_MASK) &&
(type != CHECK_TYPE_RANGE)) {
errno = EINVAL;
return -1;
}
if ((acceptreject != CHECK_ACCEPT) &&
(acceptreject != CHECK_REJECT)) {
errno = EINVAL;
return -1;
}
if ((type != CHECK_TYPE_ADDRESS) && (!ss2)) {
errno = EINVAL;
return -1;
}
if ((type == CHECK_TYPE_RANGE) &&
(ss1->ss_family != ss2->ss_family)) {
errno = EINVAL;
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->dynamic != KNET_LINK_DYNIP) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is a point to point connection: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
- err = check_rm(knet_h, transport_link_get_acl_fd(knet_h, link), link->transport,
+ err = check_rm(knet_h, link,
ss1, ss2, type, acceptreject);
savederrno = errno;
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
int knet_link_clear_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id)
{
int savederrno = 0, err = 0;
struct knet_host *host;
struct knet_link *link;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (link_id >= KNET_MAX_LINK) {
errno = EINVAL;
return -1;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
host = knet_h->host_index[host_id];
if (!host) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s",
host_id, strerror(savederrno));
goto exit_unlock;
}
link = &host->link[link_id];
if (!link->configured) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
if (link->dynamic != KNET_LINK_DYNIP) {
err = -1;
savederrno = EINVAL;
log_err(knet_h, KNET_SUB_LINK, "host %u link %u is a point to point connection: %s",
host_id, link_id, strerror(savederrno));
goto exit_unlock;
}
- check_rmall(knet_h, transport_link_get_acl_fd(knet_h, link), link->transport);
+ check_rmall(knet_h, link);
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = savederrno;
return err;
}
diff --git a/libknet/links_acl.c b/libknet/links_acl.c
index 1612f15e..66faf244 100644
--- a/libknet/links_acl.c
+++ b/libknet/links_acl.c
@@ -1,65 +1,67 @@
/*
* Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved.
*
* Author: Christine Caulfield <ccaulfie@redhat.com>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <stdint.h>
#include <string.h>
#include <stdlib.h>
#include "internals.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "links_acl.h"
#include "links_acl_ip.h"
#include "links_acl_loopback.h"
static check_ops_t proto_check_modules_cmds[] = {
{ TRANSPORT_PROTO_LOOPBACK, loopbackcheck_validate, loopbackcheck_add, loopbackcheck_rm, loopbackcheck_rmall },
{ TRANSPORT_PROTO_IP_PROTO, ipcheck_validate, ipcheck_addip, ipcheck_rmip, ipcheck_rmall }
};
/*
* all those functions will return errno from the
* protocol specific functions
*/
-int check_add(knet_handle_t knet_h, int sock, uint8_t transport, int index,
+int check_add(knet_handle_t knet_h, struct knet_link *kn_link,
+ int index,
struct sockaddr_storage *ss1, struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
- return proto_check_modules_cmds[transport_get_proto(knet_h, transport)].protocheck_add(
- &knet_h->knet_transport_fd_tracker[sock].access_list_match_entry_head, index,
+ return proto_check_modules_cmds[transport_get_proto(knet_h, kn_link->transport)].protocheck_add(
+ &kn_link->access_list_match_entry_head, index,
ss1, ss2, type, acceptreject);
}
-int check_rm(knet_handle_t knet_h, int sock, uint8_t transport,
+int check_rm(knet_handle_t knet_h, struct knet_link *kn_link,
struct sockaddr_storage *ss1, struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
- return proto_check_modules_cmds[transport_get_proto(knet_h, transport)].protocheck_rm(
- &knet_h->knet_transport_fd_tracker[sock].access_list_match_entry_head,
+ return proto_check_modules_cmds[transport_get_proto(knet_h, kn_link->transport)].protocheck_rm(
+ &kn_link->access_list_match_entry_head,
ss1, ss2, type, acceptreject);
}
-void check_rmall(knet_handle_t knet_h, int sock, uint8_t transport)
+void check_rmall(knet_handle_t knet_h, struct knet_link *kn_link)
{
- proto_check_modules_cmds[transport_get_proto(knet_h, transport)].protocheck_rmall(
- &knet_h->knet_transport_fd_tracker[sock].access_list_match_entry_head);
+ proto_check_modules_cmds[transport_get_proto(knet_h, kn_link->transport)].protocheck_rmall(
+ &kn_link->access_list_match_entry_head);
}
/*
* return 0 to reject and 1 to accept a packet
*/
-int check_validate(knet_handle_t knet_h, int sock, uint8_t transport, struct sockaddr_storage *checkip)
+int check_validate(knet_handle_t knet_h, struct knet_link *kn_link,
+ struct sockaddr_storage *checkip)
{
- return proto_check_modules_cmds[transport_get_proto(knet_h, transport)].protocheck_validate(
- &knet_h->knet_transport_fd_tracker[sock].access_list_match_entry_head, checkip);
+ return proto_check_modules_cmds[transport_get_proto(knet_h, kn_link->transport)].protocheck_validate(
+ &kn_link->access_list_match_entry_head, checkip);
}
diff --git a/libknet/links_acl.h b/libknet/links_acl.h
index 9901c956..3b9fd35e 100644
--- a/libknet/links_acl.h
+++ b/libknet/links_acl.h
@@ -1,39 +1,44 @@
/*
* Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved.
*
* Author: Christine Caulfield <ccaulfie@redhat.com>
*
* This software licensed under LGPL-2.0+
*/
#ifndef __KNET_LINKS_ACL_H__
#define __KNET_LINKS_ACL_H__
#include "internals.h"
typedef struct {
uint8_t transport_proto;
int (*protocheck_validate) (void *fd_tracker_match_entry_head, struct sockaddr_storage *checkip);
int (*protocheck_add) (void *fd_tracker_match_entry_head, int index,
struct sockaddr_storage *ss1, struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject);
int (*protocheck_rm) (void *fd_tracker_match_entry_head,
struct sockaddr_storage *ss1, struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject);
void (*protocheck_rmall) (void *fd_tracker_match_entry_head);
} check_ops_t;
-int check_add(knet_handle_t knet_h, int sock, uint8_t transport, int index,
+int check_add(knet_handle_t knet_h, struct knet_link *kn_link,
+ int index,
struct sockaddr_storage *ss1, struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject);
-int check_rm(knet_handle_t knet_h, int sock, uint8_t transport,
+
+int check_rm(knet_handle_t knet_h, struct knet_link *kn_link,
struct sockaddr_storage *ss1, struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject);
-void check_rmall(knet_handle_t knet_h, int sock, uint8_t transport);
-int check_validate(knet_handle_t knet_h, int sock, uint8_t transport, struct sockaddr_storage *checkip);
+
+void check_rmall(knet_handle_t knet_h, struct knet_link *kn_link);
+
+int check_validate(knet_handle_t knet_h, struct knet_link *kn_link,
+ struct sockaddr_storage *checkip);
#endif
diff --git a/libknet/links_acl_ip.c b/libknet/links_acl_ip.c
index ea1755f2..8d562c58 100644
--- a/libknet/links_acl_ip.c
+++ b/libknet/links_acl_ip.c
@@ -1,302 +1,305 @@
/*
* Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved.
*
* Author: Christine Caulfield <ccaulfie@redhat.com>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdint.h>
#include <string.h>
#include <stdlib.h>
#include "internals.h"
#include "netutils.h"
#include "logging.h"
#include "transports.h"
#include "links_acl.h"
#include "links_acl_ip.h"
struct ip_acl_match_entry {
check_type_t type;
check_acceptreject_t acceptreject;
struct sockaddr_storage addr1; /* Actual IP address, mask top or low IP */
struct sockaddr_storage addr2; /* high IP address or address bitmask */
struct ip_acl_match_entry *next;
};
/*
* IPv4 See if the address we have matches the current match entry
*/
static int ip_matches_v4(struct sockaddr_storage *checkip, struct ip_acl_match_entry *match_entry)
{
struct sockaddr_in *ip_to_check;
struct sockaddr_in *match1;
struct sockaddr_in *match2;
ip_to_check = (struct sockaddr_in *)checkip;
match1 = (struct sockaddr_in *)&match_entry->addr1;
match2 = (struct sockaddr_in *)&match_entry->addr2;
switch(match_entry->type) {
case CHECK_TYPE_ADDRESS:
if (ip_to_check->sin_addr.s_addr == match1->sin_addr.s_addr)
return 1;
break;
case CHECK_TYPE_MASK:
if ((ip_to_check->sin_addr.s_addr & match2->sin_addr.s_addr) ==
match1->sin_addr.s_addr)
return 1;
break;
case CHECK_TYPE_RANGE:
if ((ntohl(ip_to_check->sin_addr.s_addr) >= ntohl(match1->sin_addr.s_addr)) &&
(ntohl(ip_to_check->sin_addr.s_addr) <= ntohl(match2->sin_addr.s_addr)))
return 1;
break;
}
return 0;
}
/*
* Compare two IPv6 addresses
*/
static int ip6addr_cmp(struct in6_addr *a, struct in6_addr *b)
{
uint64_t a_high, a_low;
uint64_t b_high, b_low;
a_high = ((uint64_t)htonl(a->s6_addr32[0]) << 32) | (uint64_t)htonl(a->s6_addr32[1]);
a_low = ((uint64_t)htonl(a->s6_addr32[2]) << 32) | (uint64_t)htonl(a->s6_addr32[3]);
b_high = ((uint64_t)htonl(b->s6_addr32[0]) << 32) | (uint64_t)htonl(b->s6_addr32[1]);
b_low = ((uint64_t)htonl(b->s6_addr32[2]) << 32) | (uint64_t)htonl(b->s6_addr32[3]);
if (a_high > b_high)
return 1;
if (a_high < b_high)
return -1;
if (a_low > b_low)
return 1;
if (a_low < b_low)
return -1;
return 0;
}
/*
* IPv6 See if the address we have matches the current match entry
*/
static int ip_matches_v6(struct sockaddr_storage *checkip, struct ip_acl_match_entry *match_entry)
{
struct sockaddr_in6 *ip_to_check;
struct sockaddr_in6 *match1;
struct sockaddr_in6 *match2;
int i;
ip_to_check = (struct sockaddr_in6 *)checkip;
match1 = (struct sockaddr_in6 *)&match_entry->addr1;
match2 = (struct sockaddr_in6 *)&match_entry->addr2;
switch(match_entry->type) {
case CHECK_TYPE_ADDRESS:
if (!memcmp(ip_to_check->sin6_addr.s6_addr32, match1->sin6_addr.s6_addr32, sizeof(struct in6_addr)))
return 1;
break;
case CHECK_TYPE_MASK:
/*
* Note that this little loop will quit early if there is a non-match so the
* comparison might look backwards compared to the IPv4 one
*/
for (i=sizeof(struct in6_addr)/4-1; i>=0; i--) {
if ((ip_to_check->sin6_addr.s6_addr32[i] & match2->sin6_addr.s6_addr32[i]) !=
match1->sin6_addr.s6_addr32[i])
return 0;
}
return 1;
case CHECK_TYPE_RANGE:
if ((ip6addr_cmp(&ip_to_check->sin6_addr, &match1->sin6_addr) >= 0) &&
(ip6addr_cmp(&ip_to_check->sin6_addr, &match2->sin6_addr) <= 0))
return 1;
break;
}
return 0;
}
int ipcheck_validate(void *fd_tracker_match_entry_head, struct sockaddr_storage *checkip)
{
struct ip_acl_match_entry **match_entry_head = (struct ip_acl_match_entry **)fd_tracker_match_entry_head;
struct ip_acl_match_entry *match_entry = *match_entry_head;
int (*match_fn)(struct sockaddr_storage *checkip, struct ip_acl_match_entry *match_entry);
if (checkip->ss_family == AF_INET) {
match_fn = ip_matches_v4;
} else {
match_fn = ip_matches_v6;
}
while (match_entry) {
if (match_fn(checkip, match_entry)) {
if (match_entry->acceptreject == CHECK_ACCEPT)
return 1;
else
return 0;
}
match_entry = match_entry->next;
}
return 0; /* Default reject */
}
/*
* Routines to manuipulate access lists
*/
void ipcheck_rmall(void *fd_tracker_match_entry_head)
{
struct ip_acl_match_entry **match_entry_head = (struct ip_acl_match_entry **)fd_tracker_match_entry_head;
struct ip_acl_match_entry *next_match_entry;
struct ip_acl_match_entry *match_entry = *match_entry_head;
while (match_entry) {
next_match_entry = match_entry->next;
free(match_entry);
match_entry = next_match_entry;
}
*match_entry_head = NULL;
}
static struct ip_acl_match_entry *ipcheck_findmatch(struct ip_acl_match_entry **match_entry_head,
struct sockaddr_storage *ss1, struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
struct ip_acl_match_entry *match_entry = *match_entry_head;
while (match_entry) {
- if ((!memcmp(&match_entry->addr1, ss1, sizeof(struct sockaddr_storage))) &&
- (!memcmp(&match_entry->addr2, ss2, sizeof(struct sockaddr_storage))) &&
+ if ((!memcmp(&match_entry->addr1, ss1, sockaddr_len(ss1))) &&
+ ((match_entry->type == CHECK_TYPE_ADDRESS) ||
+ ((match_entry->type != CHECK_TYPE_ADDRESS) && ss2 && !memcmp(&match_entry->addr2, ss2, sockaddr_len(ss1)))) &&
(match_entry->type == type) &&
(match_entry->acceptreject == acceptreject)) {
return match_entry;
}
match_entry = match_entry->next;
}
return NULL;
}
int ipcheck_rmip(void *fd_tracker_match_entry_head,
struct sockaddr_storage *ss1, struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
struct ip_acl_match_entry **match_entry_head = (struct ip_acl_match_entry **)fd_tracker_match_entry_head;
struct ip_acl_match_entry *next_match_entry = NULL;
struct ip_acl_match_entry *rm_match_entry;
struct ip_acl_match_entry *match_entry = *match_entry_head;
rm_match_entry = ipcheck_findmatch(match_entry_head, ss1, ss2, type, acceptreject);
if (!rm_match_entry) {
errno = ENOENT;
return -1;
}
while (match_entry) {
next_match_entry = match_entry->next;
/*
* we are removing the list head, be careful
*/
if (rm_match_entry == match_entry) {
*match_entry_head = next_match_entry;
free(match_entry);
break;
}
/*
* the next one is the one we need to remove
*/
if (rm_match_entry == next_match_entry) {
match_entry->next = next_match_entry->next;
free(next_match_entry);
break;
}
match_entry = next_match_entry;
}
return 0;
}
int ipcheck_addip(void *fd_tracker_match_entry_head, int index,
struct sockaddr_storage *ss1, struct sockaddr_storage *ss2,
check_type_t type, check_acceptreject_t acceptreject)
{
struct ip_acl_match_entry **match_entry_head = (struct ip_acl_match_entry **)fd_tracker_match_entry_head;
struct ip_acl_match_entry *new_match_entry;
struct ip_acl_match_entry *match_entry = *match_entry_head;
int i = 0;
if (ipcheck_findmatch(match_entry_head, ss1, ss2, type, acceptreject) != NULL) {
errno = EEXIST;
return -1;
}
new_match_entry = malloc(sizeof(struct ip_acl_match_entry));
if (!new_match_entry) {
return -1;
}
copy_sockaddr(&new_match_entry->addr1, ss1);
- copy_sockaddr(&new_match_entry->addr2, ss2);
+ if (ss2) {
+ copy_sockaddr(&new_match_entry->addr2, ss2);
+ }
new_match_entry->type = type;
new_match_entry->acceptreject = acceptreject;
new_match_entry->next = NULL;
if (match_entry) {
/*
* special case for index 0, since we need to update
* the head of the list
*/
if (index == 0) {
*match_entry_head = new_match_entry;
new_match_entry->next = match_entry;
} else {
/*
* find the end of the list or stop at "index"
*/
while (match_entry->next) {
match_entry = match_entry->next;
if (i == index) {
break;
}
i++;
}
/*
* insert if there are more entries in the list
*/
if (match_entry->next) {
new_match_entry->next = match_entry->next;
}
/*
* add if we are at the end
*/
match_entry->next = new_match_entry;
}
} else {
/*
* first entry in the list
*/
*match_entry_head = new_match_entry;
}
return 0;
}
diff --git a/libknet/tests/Makefile.am b/libknet/tests/Makefile.am
index 60c83bda..16e2e89b 100644
--- a/libknet/tests/Makefile.am
+++ b/libknet/tests/Makefile.am
@@ -1,113 +1,117 @@
#
# Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved.
#
# Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
#
# This software licensed under GPL-2.0+
#
MAINTAINERCLEANFILES = Makefile.in
include $(top_srcdir)/build-aux/check.mk
include $(top_srcdir)/libknet/tests/api-check.mk
EXTRA_DIST = \
api-test-coverage \
api-check.mk
AM_CPPFLAGS = -I$(top_srcdir)/libknet
AM_CFLAGS += $(PTHREAD_CFLAGS) $(libqb_CFLAGS)
LIBS = $(top_builddir)/libknet/libknet.la \
$(PTHREAD_LIBS) $(dl_LIBS)
noinst_HEADERS = \
test-common.h
# the order of those tests is NOT random.
# some functions can only be tested properly after some dependents
# API have been validated upfront.
check_PROGRAMS = \
$(api_checks) \
$(int_checks) \
$(fun_checks)
int_checks = \
int_links_acl_ip_test \
int_timediff_test
fun_checks = \
- fun_config_crypto_test
+ fun_config_crypto_test \
+ fun_acl_check_test
# checks below need to be executed manually
# or with a specifi environment
long_run_checks = \
fun_pmtud_crypto_test
benchmarks = \
knet_bench_test
noinst_PROGRAMS = \
api_knet_handle_new_limit_test \
pckt_test \
$(benchmarks) \
$(long_run_checks) \
$(check_PROGRAMS)
noinst_SCRIPTS = \
api-test-coverage
TESTS = $(check_PROGRAMS)
if INSTALL_TESTS
testsuitedir = $(TESTDIR)
testsuite_PROGRAMS = $(noinst_PROGRAMS)
endif
check-local: check-api-test-coverage
check-api-test-coverage:
chmod u+x $(top_srcdir)/libknet/tests/api-test-coverage
$(top_srcdir)/libknet/tests/api-test-coverage $(top_srcdir) $(top_builddir)
pckt_test_SOURCES = pckt_test.c
int_links_acl_ip_test_SOURCES = int_links_acl_ip.c \
../common.c \
../compat.c \
../logging.c \
../netutils.c \
../threads_common.c \
../onwire.c \
../transports.c \
../transport_common.c \
../transport_loopback.c \
../transport_sctp.c \
../transport_udp.c \
../links_acl.c \
../links_acl_ip.c \
../links_acl_loopback.c \
../lib_config.c
int_timediff_test_SOURCES = int_timediff.c
knet_bench_test_SOURCES = knet_bench.c \
test-common.c \
../common.c \
../logging.c \
../compat.c \
../transport_common.c \
../threads_common.c \
../onwire.c \
../lib_config.c
fun_pmtud_crypto_test_SOURCES = fun_pmtud_crypto.c \
test-common.c \
../onwire.c \
../logging.c \
../threads_common.c \
../lib_config.c
fun_config_crypto_test_SOURCES = fun_config_crypto.c \
test-common.c
+
+fun_acl_check_test_SOURCES = fun_acl_check.c \
+ test-common.c
diff --git a/libknet/tests/api_knet_link_add_acl.c b/libknet/tests/api_knet_link_add_acl.c
index 4f6f8c91..759afdc2 100644
--- a/libknet/tests/api_knet_link_add_acl.c
+++ b/libknet/tests/api_knet_link_add_acl.c
@@ -1,246 +1,246 @@
/*
* Copyright (C) 2019-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <inttypes.h>
#include "libknet.h"
#include "internals.h"
#include "netutils.h"
#include "test-common.h"
static void test(void)
{
knet_handle_t knet_h;
int logfds[2];
struct knet_host *host;
struct knet_link *link;
struct sockaddr_storage lo, lo6;
if (make_local_sockaddr(&lo, 0) < 0) {
printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno));
exit(FAIL);
}
if (make_local_sockaddr6(&lo6, 0) < 0) {
printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno));
exit(FAIL);
}
printf("Test knet_link_add_acl incorrect knet_h\n");
if ((!knet_link_add_acl(NULL, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_add_acl accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno));
exit(FAIL);
}
setup_logpipes(logfds);
knet_h = knet_handle_start(logfds, KNET_LOG_DEBUG);
printf("Test knet_link_add_acl with unconfigured host\n");
if ((!knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_add_acl accepted unconfigured host or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_add_acl with unconfigured link\n");
if (knet_host_add(knet_h, 1) < 0) {
printf("knet_host_add failed: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((!knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_add_acl accepted unconfigured link or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_add_acl with invalid link\n");
if ((!knet_link_add_acl(knet_h, 1, KNET_MAX_LINK, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_add_acl accepted invalid link or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_add_acl with invalid ss1\n");
if ((!knet_link_add_acl(knet_h, 1, 0, NULL, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_add_acl accepted invalid ss1 or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_add_acl with invalid ss2\n");
if ((!knet_link_add_acl(knet_h, 1, 0, &lo, NULL, CHECK_TYPE_RANGE, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_add_acl accepted invalid ss2 or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_add_acl with non matching families\n");
if ((!knet_link_add_acl(knet_h, 1, 0, &lo, &lo6, CHECK_TYPE_RANGE, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_add_acl accepted non matching families or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_add_acl with wrong check_type\n");
if ((!knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_RANGE + CHECK_TYPE_MASK + CHECK_TYPE_ADDRESS + 1, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_add_acl accepted incorrect check_type or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_add_acl with wrong acceptreject\n");
if ((!knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT + CHECK_REJECT + 1)) || (errno != EINVAL)) {
printf("knet_link_add_acl accepted incorrect check_type or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_add_acl with point to point link\n");
if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 0, &lo) < 0) {
printf("Unable to configure link: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((!knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_add_acl accepted point to point link or returned incorrect error: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
knet_link_clear_config(knet_h, 1, 0);
printf("Test knet_link_add_acl with dynamic link\n");
if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 1, &lo) < 0) {
printf("Unable to configure link: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
host = knet_h->host_index[1];
link = &host->link[0];
- if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) {
+ if (link->access_list_match_entry_head) {
printf("match list not empty!");
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) {
printf("knet_link_add_acl did not accept dynamic link error: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
- if (!knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) {
+ if (!link->access_list_match_entry_head) {
printf("match list empty!");
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
}
int main(int argc, char *argv[])
{
test();
return PASS;
}
diff --git a/libknet/tests/api_knet_link_clear_acl.c b/libknet/tests/api_knet_link_clear_acl.c
index 9b5cfbb8..d2a6d8d5 100644
--- a/libknet/tests/api_knet_link_clear_acl.c
+++ b/libknet/tests/api_knet_link_clear_acl.c
@@ -1,191 +1,191 @@
/*
* Copyright (C) 2019-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <inttypes.h>
#include "libknet.h"
#include "internals.h"
#include "netutils.h"
#include "test-common.h"
static void test(void)
{
knet_handle_t knet_h;
int logfds[2];
struct knet_host *host;
struct knet_link *link;
struct sockaddr_storage lo;
printf("Test knet_link_clear_acl incorrect knet_h\n");
if ((!knet_link_clear_acl(NULL, 1, 0)) || (errno != EINVAL)) {
printf("knet_link_clear_acl accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno));
exit(FAIL);
}
setup_logpipes(logfds);
knet_h = knet_handle_start(logfds, KNET_LOG_DEBUG);
printf("Test knet_link_clear_acl with unconfigured host\n");
if ((!knet_link_clear_acl(knet_h, 1, 0)) || (errno != EINVAL)) {
printf("knet_link_clear_acl accepted unconfigured host or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_clear_acl with unconfigured link\n");
if (knet_host_add(knet_h, 1) < 0) {
printf("knet_host_add failed: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((!knet_link_clear_acl(knet_h, 1, 0)) || (errno != EINVAL)) {
printf("knet_link_clear_acl accepted unconfigured link or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_clear_acl with invalid link\n");
if ((!knet_link_clear_acl(knet_h, 1, KNET_MAX_LINK)) || (errno != EINVAL)) {
printf("knet_link_clear_acl accepted invalid link or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_clear_acl with point to point link\n");
if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 0, &lo) < 0) {
printf("Unable to configure link: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((!knet_link_clear_acl(knet_h, 1, 0)) || (errno != EINVAL)) {
printf("knet_link_clear_acl accepted point ot point link or returned incorrect error: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
knet_link_clear_config(knet_h, 1, 0);
printf("Test knet_link_clear_acl with dynamic link\n");
if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 1, &lo) < 0) {
printf("Unable to configure link: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
host = knet_h->host_index[1];
link = &host->link[0];
- if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) {
+ if (link->access_list_match_entry_head) {
printf("match list NOT empty!");
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) {
printf("knet_link_clear_acl did not accept dynamic link error: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
- if (!knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) {
+ if (!link->access_list_match_entry_head) {
printf("match list empty!");
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_clear_acl(knet_h, 1, 0) < 0) {
printf("knet_link_clear_acl failed to clear. error: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
- if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) {
+ if (link->access_list_match_entry_head) {
printf("match list NOT empty!");
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
}
int main(int argc, char *argv[])
{
test();
return PASS;
}
diff --git a/libknet/tests/api_knet_link_insert_acl.c b/libknet/tests/api_knet_link_insert_acl.c
index c0a4de3e..a5de2378 100644
--- a/libknet/tests/api_knet_link_insert_acl.c
+++ b/libknet/tests/api_knet_link_insert_acl.c
@@ -1,246 +1,246 @@
/*
* Copyright (C) 2019-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <inttypes.h>
#include "libknet.h"
#include "internals.h"
#include "netutils.h"
#include "test-common.h"
static void test(void)
{
knet_handle_t knet_h;
int logfds[2];
struct knet_host *host;
struct knet_link *link;
struct sockaddr_storage lo, lo6;
if (make_local_sockaddr(&lo, 0) < 0) {
printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno));
exit(FAIL);
}
if (make_local_sockaddr6(&lo6, 0) < 0) {
printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno));
exit(FAIL);
}
printf("Test knet_link_insert_acl incorrect knet_h\n");
if ((!knet_link_insert_acl(NULL, 1, 0, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_insert_acl accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno));
exit(FAIL);
}
setup_logpipes(logfds);
knet_h = knet_handle_start(logfds, KNET_LOG_DEBUG);
printf("Test knet_link_insert_acl with unconfigured host\n");
if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_insert_acl accepted unconfigured host or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_insert_acl with unconfigured link\n");
if (knet_host_add(knet_h, 1) < 0) {
printf("knet_host_add failed: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_insert_acl accepted unconfigured link or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_insert_acl with invalid link\n");
if ((!knet_link_insert_acl(knet_h, 1, KNET_MAX_LINK, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_insert_acl accepted invalid link or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_insert_acl with invalid ss1\n");
if ((!knet_link_insert_acl(knet_h, 1, 0, 0, NULL, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_insert_acl accepted invalid ss1 or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_insert_acl with invalid ss2\n");
if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, NULL, CHECK_TYPE_RANGE, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_insert_acl accepted invalid ss2 or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_insert_acl with non matching families\n");
if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo6, CHECK_TYPE_RANGE, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_insert_acl accepted non matching families or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_insert_acl with wrong check_type\n");
if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo, CHECK_TYPE_RANGE + CHECK_TYPE_MASK + CHECK_TYPE_ADDRESS + 1, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_insert_acl accepted incorrect check_type or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_insert_acl with wrong acceptreject\n");
if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT + CHECK_REJECT + 1)) || (errno != EINVAL)) {
printf("knet_link_insert_acl accepted incorrect check_type or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_insert_acl with point to point link\n");
if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 0, &lo) < 0) {
printf("Unable to configure link: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_insert_acl accepted point ot point link or returned incorrect error: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
knet_link_clear_config(knet_h, 1, 0);
printf("Test knet_link_insert_acl with dynamic link\n");
if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 1, &lo) < 0) {
printf("Unable to configure link: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
host = knet_h->host_index[1];
link = &host->link[0];
- if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) {
+ if (link->access_list_match_entry_head) {
printf("match list not empty!");
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) {
printf("knet_link_insert_acl did not accept dynamic link error: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
- if (!knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) {
+ if (!link->access_list_match_entry_head) {
printf("match list empty!");
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
}
int main(int argc, char *argv[])
{
test();
return PASS;
}
diff --git a/libknet/tests/api_knet_link_rm_acl.c b/libknet/tests/api_knet_link_rm_acl.c
index b3300dd1..7af3fd0a 100644
--- a/libknet/tests/api_knet_link_rm_acl.c
+++ b/libknet/tests/api_knet_link_rm_acl.c
@@ -1,256 +1,256 @@
/*
* Copyright (C) 2019-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <inttypes.h>
#include "libknet.h"
#include "internals.h"
#include "netutils.h"
#include "test-common.h"
static void test(void)
{
knet_handle_t knet_h;
int logfds[2];
struct knet_host *host;
struct knet_link *link;
struct sockaddr_storage lo, lo6;
if (make_local_sockaddr(&lo, 0) < 0) {
printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno));
exit(FAIL);
}
if (make_local_sockaddr6(&lo6, 0) < 0) {
printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno));
exit(FAIL);
}
printf("Test knet_link_rm_acl incorrect knet_h\n");
if ((!knet_link_rm_acl(NULL, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_rm_acl accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno));
exit(FAIL);
}
setup_logpipes(logfds);
knet_h = knet_handle_start(logfds, KNET_LOG_DEBUG);
printf("Test knet_link_rm_acl with unconfigured host\n");
if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_rm_acl accepted unconfigured host or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_rm_acl with unconfigured link\n");
if (knet_host_add(knet_h, 1) < 0) {
printf("knet_host_add failed: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_rm_acl accepted unconfigured link or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_rm_acl with invalid link\n");
if ((!knet_link_rm_acl(knet_h, 1, KNET_MAX_LINK, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_rm_acl accepted invalid link or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_rm_acl with invalid ss1\n");
if ((!knet_link_rm_acl(knet_h, 1, 0, NULL, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_rm_acl accepted invalid ss1 or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_rm_acl with invalid ss2\n");
if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, NULL, CHECK_TYPE_RANGE, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_rm_acl accepted invalid ss2 or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_rm_acl with non matching families\n");
if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, &lo6, CHECK_TYPE_RANGE, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_rm_acl accepted non matching families or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_rm_acl with wrong check_type\n");
if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_RANGE + CHECK_TYPE_MASK + CHECK_TYPE_ADDRESS + 1, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_rm_acl accepted incorrect check_type or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_rm_acl with wrong acceptreject\n");
if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT + CHECK_REJECT + 1)) || (errno != EINVAL)) {
printf("knet_link_rm_acl accepted incorrect check_type or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_rm_acl with point to point link\n");
if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 0, &lo) < 0) {
printf("Unable to configure link: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) {
printf("knet_link_rm_acl accepted point ot point link or returned incorrect error: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
knet_link_clear_config(knet_h, 1, 0);
printf("Test knet_link_rm_acl with dynamic link\n");
if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 1, &lo) < 0) {
printf("Unable to configure link: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
host = knet_h->host_index[1];
link = &host->link[0];
- if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) {
+ if (link->access_list_match_entry_head) {
printf("match list not empty!");
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) {
printf("Failed to add an access list: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_rm_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) {
printf("knet_link_rm_acl did not accept dynamic link error: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
- if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) {
+ if (link->access_list_match_entry_head) {
printf("match list NOT empty!");
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
}
int main(int argc, char *argv[])
{
test();
return PASS;
}
diff --git a/libknet/tests/api_knet_link_set_config.c b/libknet/tests/api_knet_link_set_config.c
index 402b51c6..14025649 100644
--- a/libknet/tests/api_knet_link_set_config.c
+++ b/libknet/tests/api_knet_link_set_config.c
@@ -1,313 +1,313 @@
/*
* Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "libknet.h"
#include "internals.h"
#include "links.h"
#include "netutils.h"
#include "test-common.h"
static void test(void)
{
knet_handle_t knet_h;
struct knet_host *host;
struct knet_link *link;
int logfds[2];
char lo_portstr[32];
struct sockaddr_storage lo, lo6;
struct sockaddr_in *lo_in = (struct sockaddr_in *)&lo;
struct knet_link_status link_status;
if (make_local_sockaddr(&lo, -1) < 0) {
printf("Unable to convert src to sockaddr: %s\n", strerror(errno));
exit(FAIL);
}
sprintf(lo_portstr, "%d", ntohs(lo_in->sin_port));
printf("Test knet_link_set_config incorrect knet_h\n");
if ((!knet_link_set_config(NULL, 1, 0, KNET_TRANSPORT_UDP, &lo, &lo, 0)) || (errno != EINVAL)) {
printf("knet_link_set_config accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno));
exit(FAIL);
}
setup_logpipes(logfds);
knet_h = knet_handle_start(logfds, KNET_LOG_DEBUG);
printf("Test knet_link_set_config with unconfigured host_id\n");
if ((!knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &lo, &lo, 0)) || (errno != EINVAL)) {
printf("knet_link_set_config accepted invalid host_id or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
printf("Test knet_link_set_config with bad transport type\n");
if ((!knet_link_set_config(knet_h, 1, 0, KNET_MAX_TRANSPORTS, &lo, &lo, 0)) || (errno != EINVAL)) {
printf("knet_link_set_config accepted invalid transport or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_set_config with incorrect linkid\n");
if (knet_host_add(knet_h, 1) < 0) {
printf("Unable to add host_id 1: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((!knet_link_set_config(knet_h, 1, KNET_MAX_LINK, KNET_TRANSPORT_UDP, &lo, &lo, 0)) || (errno != EINVAL)) {
printf("knet_link_set_config accepted invalid linkid or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_set_config with incorrect src_addr\n");
if ((!knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, NULL, &lo, 0)) || (errno != EINVAL)) {
printf("knet_link_set_config accepted invalid src_addr or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_set_config with conflicting address families\n");
if (make_local_sockaddr6(&lo6, -1) < 0) {
printf("Unable to convert dst to sockaddr: %s\n", strerror(errno));
exit(FAIL);
}
if (knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &lo, &lo6, 0) == 0) {
printf("knet_link_set_config accepted invalid address families: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_set_config with dynamic dst_addr\n");
if (knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &lo, NULL, 0) < 0) {
printf("Unable to configure link: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
host = knet_h->host_index[1];
link = &host->link[0];
- if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) {
+ if (link->access_list_match_entry_head) {
printf("found access lists for dynamic dst_addr!\n");
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_get_status(knet_h, 1, 0, &link_status, sizeof(struct knet_link_status)) < 0) {
printf("Unable to get link status: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((link_status.enabled != 0) ||
(strcmp(link_status.src_ipaddr, "127.0.0.1")) ||
(strcmp(link_status.src_port, lo_portstr)) ||
(knet_h->host_index[1]->link[0].dynamic != KNET_LINK_DYNIP)) {
printf("knet_link_set_config failed to set configuration. enabled: %d src_addr %s src_port %s dynamic %u\n",
link_status.enabled, link_status.src_ipaddr, link_status.src_port, knet_h->host_index[1]->link[0].dynamic);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_link_set_config with dynamic link (0) and static link (1)\n");
if (!knet_link_set_config(knet_h, 1, 1, KNET_TRANSPORT_UDP, &lo, &lo, 0) || (errno != EINVAL)) {
printf("knet_link_set_config accepted request mixed static/dynamic request or returned incorrect error: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
printf("Test knet_link_set_config with already configured link\n");
if ((!knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &lo, NULL, 0) || (errno != EBUSY))) {
printf("knet_link_set_config accepted request while link configured or returned incorrect error: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
printf("Test knet_link_set_config with link enabled\n");
if (knet_link_set_enable(knet_h, 1, 0, 1) < 0) {
printf("Unable to enable link: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_get_status(knet_h, 1, 0, &link_status, sizeof(struct knet_link_status)) < 0) {
printf("Unable to get link status: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((!knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &lo, NULL, 0)) || (errno != EBUSY)) {
printf("knet_link_set_config accepted request while link enabled or returned incorrect error: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_set_enable(knet_h, 1, 0, 0) < 0) {
printf("Unable to disable link: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_clear_config(knet_h, 1, 0) < 0) {
printf("Unable to clear link config: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
printf("Test knet_link_set_config with static dst_addr\n");
if (knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &lo, &lo, 0) < 0) {
printf("Unable to configure link: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
host = knet_h->host_index[1];
link = &host->link[0];
- if (!knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) {
+ if (!link->access_list_match_entry_head) {
printf("Unable to find default access lists for static dst_addr!\n");
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_link_get_status(knet_h, 1, 0, &link_status, sizeof(struct knet_link_status)) < 0) {
printf("Unable to get link status: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if ((link_status.enabled != 0) ||
(strcmp(link_status.src_ipaddr, "127.0.0.1")) ||
(strcmp(link_status.src_port, lo_portstr)) ||
(strcmp(link_status.dst_ipaddr, "127.0.0.1")) ||
(strcmp(link_status.dst_port, lo_portstr)) ||
(knet_h->host_index[1]->link[0].dynamic != KNET_LINK_STATIC)) {
printf("knet_link_set_config failed to set configuration. enabled: %d src_addr %s src_port %s dst_addr %s dst_port %s dynamic %u\n",
link_status.enabled, link_status.src_ipaddr, link_status.src_port, link_status.dst_ipaddr, link_status.dst_port, knet_h->host_index[1]->link[0].dynamic);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
}
int main(int argc, char *argv[])
{
test();
return PASS;
}
diff --git a/libknet/tests/api_knet_send.c b/libknet/tests/api_knet_send.c
index f4ffb3df..a3b5aa49 100644
--- a/libknet/tests/api_knet_send.c
+++ b/libknet/tests/api_knet_send.c
@@ -1,368 +1,376 @@
/*
* Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <inttypes.h>
#include "libknet.h"
#include "internals.h"
#include "netutils.h"
#include "test-common.h"
static int private_data;
static void sock_notify(void *pvt_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno)
{
return;
}
static void test(uint8_t transport)
{
knet_handle_t knet_h;
int logfds[2];
int datafd = 0;
int8_t channel = 0;
struct knet_link_status link_status;
char send_buff[KNET_MAX_PACKET_SIZE + 1];
char recv_buff[KNET_MAX_PACKET_SIZE];
ssize_t send_len = 0;
int recv_len = 0;
int savederrno;
struct sockaddr_storage lo;
memset(send_buff, 0, sizeof(send_buff));
printf("Test knet_send incorrect knet_h\n");
if ((!knet_send(NULL, send_buff, KNET_MAX_PACKET_SIZE, channel)) || (errno != EINVAL)) {
printf("knet_send accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno));
exit(FAIL);
}
setup_logpipes(logfds);
knet_h = knet_handle_start(logfds, KNET_LOG_DEBUG);
+ if (knet_handle_enable_access_lists(knet_h, 1) < 0) {
+ printf("Cannot enable access lists: %s\n", strerror(errno));
+ knet_handle_free(knet_h);
+ flush_logs(logfds[0], stdout);
+ close_logpipes(logfds);
+ exit(FAIL);
+ }
+
printf("Test knet_send with no send_buff\n");
if ((!knet_send(knet_h, NULL, KNET_MAX_PACKET_SIZE, channel)) || (errno != EINVAL)) {
printf("knet_send accepted invalid send_buff or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_send with invalid send_buff len (0)\n");
if ((!knet_send(knet_h, send_buff, 0, channel)) || (errno != EINVAL)) {
printf("knet_send accepted invalid send_buff len (0) or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_send with invalid send_buff len (> KNET_MAX_PACKET_SIZE)\n");
if ((!knet_send(knet_h, send_buff, KNET_MAX_PACKET_SIZE + 1, channel)) || (errno != EINVAL)) {
printf("knet_send accepted invalid send_buff len (> KNET_MAX_PACKET_SIZE) or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_send with invalid channel (-1)\n");
channel = -1;
if ((!knet_send(knet_h, send_buff, KNET_MAX_PACKET_SIZE, channel)) || (errno != EINVAL)) {
printf("knet_send accepted invalid channel (-1) or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_send with invalid channel (KNET_DATAFD_MAX)\n");
channel = KNET_DATAFD_MAX;
if ((!knet_send(knet_h, send_buff, KNET_MAX_PACKET_SIZE, channel)) || (errno != EINVAL)) {
printf("knet_send accepted invalid channel (KNET_DATAFD_MAX) or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_send with unconfigured channel\n");
channel = 0;
if ((!knet_send(knet_h, send_buff, KNET_MAX_PACKET_SIZE, channel)) || (errno != EINVAL)) {
printf("knet_send accepted invalid unconfigured channel or returned incorrect error: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
printf("Test knet_send with valid data\n");
if (knet_handle_enable_access_lists(knet_h, 1) < 0) {
printf("knet_handle_enable_access_lists failed: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_handle_enable_sock_notify(knet_h, &private_data, sock_notify) < 0) {
printf("knet_handle_enable_sock_notify failed: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
datafd = 0;
channel = -1;
if (knet_handle_add_datafd(knet_h, &datafd, &channel) < 0) {
printf("knet_handle_add_datafd failed: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_host_add(knet_h, 1) < 0) {
printf("knet_host_add failed: %s\n", strerror(errno));
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (_knet_link_set_config(knet_h, 1, 0, transport, 0, AF_INET, 0, &lo) < 0 ) {
int exit_status = transport == KNET_TRANSPORT_SCTP && errno == EPROTONOSUPPORT ? SKIP : FAIL;
printf("Unable to configure link: %s\n", strerror(errno));
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(exit_status);
}
if (knet_link_set_enable(knet_h, 1, 0, 1) < 0) {
printf("knet_link_set_enable failed: %s\n", strerror(errno));
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (knet_handle_setfwd(knet_h, 1) < 0) {
printf("knet_handle_setfwd failed: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (wait_for_host(knet_h, 1, 10, logfds[0], stdout) < 0) {
printf("timeout waiting for host to be reachable\n");
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
send_len = knet_send(knet_h, send_buff, KNET_MAX_PACKET_SIZE, channel);
if (send_len <= 0) {
printf("knet_send failed: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (send_len != sizeof(send_buff) - 1) {
printf("knet_send sent only %zd bytes: %s\n", send_len, strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
flush_logs(logfds[0], stdout);
if (knet_handle_setfwd(knet_h, 0) < 0) {
printf("knet_handle_setfwd failed: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (wait_for_packet(knet_h, 10, datafd, logfds[0], stdout)) {
printf("Error waiting for packet: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
recv_len = knet_recv(knet_h, recv_buff, KNET_MAX_PACKET_SIZE, channel);
savederrno = errno;
if (recv_len != send_len) {
printf("knet_recv received only %d bytes: %s (errno: %d)\n", recv_len, strerror(errno), errno);
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
if ((is_helgrind()) && (recv_len == -1) && (savederrno == EAGAIN)) {
printf("helgrind exception. this is normal due to possible timeouts\n");
exit(PASS);
}
exit(FAIL);
}
if (memcmp(recv_buff, send_buff, KNET_MAX_PACKET_SIZE)) {
printf("recv and send buffers are different!\n");
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
/* A sanity check on the stats */
if (knet_link_get_status(knet_h, 1, 0, &link_status, sizeof(link_status)) < 0) {
printf("knet_link_get_status failed: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
if (link_status.stats.tx_data_packets != 2 ||
link_status.stats.rx_data_packets != 2 ||
link_status.stats.tx_data_bytes < KNET_MAX_PACKET_SIZE ||
link_status.stats.rx_data_bytes < KNET_MAX_PACKET_SIZE ||
link_status.stats.tx_data_bytes > KNET_MAX_PACKET_SIZE*2 ||
link_status.stats.rx_data_bytes > KNET_MAX_PACKET_SIZE*2) {
printf("stats look wrong: tx_packets: %" PRIu64 " (%" PRIu64 " bytes), rx_packets: %" PRIu64 " (%" PRIu64 " bytes)\n",
link_status.stats.tx_data_packets,
link_status.stats.tx_data_bytes,
link_status.stats.rx_data_packets,
link_status.stats.rx_data_bytes);
}
flush_logs(logfds[0], stdout);
if (knet_handle_setfwd(knet_h, 1) < 0) {
printf("knet_handle_setfwd failed: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
printf("try to send big packet to local datafd (bypass knet_send)\n");
if (write(datafd, &send_buff, sizeof(send_buff)) != KNET_MAX_PACKET_SIZE + 1) {
printf("Error writing to datafd: %s\n", strerror(errno));
}
if (!wait_for_packet(knet_h, 2, datafd, logfds[0], stdout)) {
printf("Received unexpected packet!\n");
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
}
int main(int argc, char *argv[])
{
printf("Testing with UDP\n");
test(KNET_TRANSPORT_UDP);
#ifdef HAVE_NETINET_SCTP_H
printf("Testing with SCTP\n");
test(KNET_TRANSPORT_SCTP);
#endif
return PASS;
}
diff --git a/libknet/tests/fun_acl_check.c b/libknet/tests/fun_acl_check.c
new file mode 100644
index 00000000..55cf09bb
--- /dev/null
+++ b/libknet/tests/fun_acl_check.c
@@ -0,0 +1,409 @@
+/*
+ * Copyright (C) 2021 Red Hat, Inc. All rights reserved.
+ *
+ * Authors: Christine Caulfield <ccaulfie@redhat.com>
+ *
+ * This software licensed under GPL-2.0+
+ */
+
+#include "config.h"
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <poll.h>
+
+#include "libknet.h"
+
+#include "internals.h"
+#include "netutils.h"
+#include "test-common.h"
+
+
+/*
+ * Keep track of how many messages got through:
+ * clean + 3xACLs + QUIT
+ */
+#define CORRECT_NUM_MSGS 5
+static int msgs_recvd = 0;
+
+#define TESTNODES 2
+
+static pthread_mutex_t recv_mutex = PTHREAD_MUTEX_INITIALIZER;
+static int quit_recv_thread = 0;
+
+static int reply_pipe[2];
+
+#define FAIL_ON_ERR(fn) \
+ printf("FOE: %s\n", #fn); \
+ if ((res = fn) != 0) { \
+ int savederrno = errno; \
+ pthread_mutex_lock(&recv_mutex); \
+ quit_recv_thread = 1; \
+ pthread_mutex_unlock(&recv_mutex); \
+ if (recv_thread) { \
+ pthread_join(recv_thread, (void**)&thread_err); \
+ } \
+ knet_handle_stop_nodes(knet_h, TESTNODES); \
+ stop_logthread(); \
+ flush_logs(logfds[0], stdout); \
+ close_logpipes(logfds); \
+ close(reply_pipe[0]); \
+ close(reply_pipe[1]); \
+ if (res == -2) { \
+ exit(SKIP); \
+ } else { \
+ printf("*** FAIL on line %d %s failed: %s\n", __LINE__ , #fn, strerror(savederrno)); \
+ exit(FAIL); \
+ } \
+ }
+
+static int knet_send_str(knet_handle_t knet_h, char *str)
+{
+ return knet_send_sync(knet_h, str, strlen(str)+1, 0);
+}
+
+/*
+ * lo0 is filled in with the local address on return.
+ * lo1 is expected to be provided - it's the actual remote address to connect to.
+ */
+int dyn_knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
+ uint8_t transport, uint64_t flags, int family, int dynamic,
+ struct sockaddr_storage *lo0, struct sockaddr_storage *lo1)
+{
+ int err = 0, savederrno = 0;
+ uint32_t port;
+ char portstr[32];
+
+ for (port = 1025; port < 65536; port++) {
+ sprintf(portstr, "%u", port);
+ memset(lo0, 0, sizeof(struct sockaddr_storage));
+ if (family == AF_INET6) {
+ err = knet_strtoaddr("::1", portstr, lo0, sizeof(struct sockaddr_storage));
+ } else {
+ err = knet_strtoaddr("127.0.0.1", portstr, lo0, sizeof(struct sockaddr_storage));
+ }
+ if (err < 0) {
+ printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno));
+ goto out;
+ }
+ errno = 0;
+ if (dynamic) {
+ err = knet_link_set_config(knet_h, host_id, link_id, transport, lo0, NULL, flags);
+ } else {
+ err = knet_link_set_config(knet_h, host_id, link_id, transport, lo0, lo1, flags);
+ }
+ savederrno = errno;
+ if ((err < 0) && (savederrno != EADDRINUSE)) {
+ if (savederrno == EPROTONOSUPPORT && transport == KNET_TRANSPORT_SCTP) {
+ return -2;
+ } else {
+ printf("Unable to configure link: %s\n", strerror(savederrno));
+ goto out;
+ }
+ }
+ if (!err) {
+ printf("Using port %u\n", port);
+ goto out;
+ }
+ }
+
+ if (err) {
+ printf("No more ports available\n");
+ }
+out:
+ errno = savederrno;
+ return err;
+}
+
+static void *recv_messages(void *handle)
+{
+ knet_handle_t knet_h = (knet_handle_t)handle;
+ char buf[4096];
+ ssize_t len;
+ static int err = 0;
+ int savederrno = 0, quit = 0;
+
+ while ((len = knet_recv(knet_h, buf, sizeof(buf), 0)) && (!quit)) {
+ savederrno = errno;
+ pthread_mutex_lock(&recv_mutex);
+ quit = quit_recv_thread;
+ pthread_mutex_unlock(&recv_mutex);
+ if (quit) {
+ printf(" *** recv thread was requested to exit via FOE\n");
+ err = 1;
+ return &err;
+ }
+ if (len > 0) {
+ int res;
+
+ printf("recv: (%ld) %s\n", (long)len, buf);
+ msgs_recvd++;
+ if (strcmp("QUIT", buf) == 0) {
+ break;
+ }
+ if (buf[0] == '0') { /* We should not have received this! */
+ printf(" *** FAIL received packet that should have been blocked\n");
+ err = 1;
+ return &err;
+ }
+ /* Tell the main thread we have received something */
+ res = write(reply_pipe[1], ".", 1);
+ if (res != 1) {
+ printf(" *** FAIL to send response back to main thread\n");
+ err = 1;
+ return &err;
+ }
+ }
+ usleep(1000);
+ if (len < 0 && savederrno != EAGAIN) {
+ break;
+ }
+ }
+ printf("-- recv thread finished: %zd %d %s\n", len, errno, strerror(savederrno));
+ return &err;
+}
+
+static void notify_fn(void *private_data,
+ int datafd,
+ int8_t channel,
+ uint8_t tx_rx,
+ int error,
+ int errorno)
+{
+ printf("NOTIFY fn called\n");
+}
+
+/* A VERY basic filter because all data traffic is going to one place */
+static int dhost_filter(void *pvt_data,
+ const unsigned char *outdata,
+ ssize_t outdata_len,
+ uint8_t tx_rx,
+ knet_node_id_t this_host_id,
+ knet_node_id_t src_host_id,
+ int8_t *dst_channel,
+ knet_node_id_t *dst_host_ids,
+ size_t *dst_host_ids_entries)
+{
+ dst_host_ids[0] = 1;
+ *dst_host_ids_entries = 1;
+ return 0;
+}
+
+/* This used to be a pthread condition variable, but
+ there was a race where it could be triggered before
+ the main thread was waiting for it.
+ Go old-fashioned.
+*/
+static int wait_for_reply(int seconds)
+{
+ int res;
+ struct pollfd pfds;
+ char tmpbuf[32];
+
+ pfds.fd = reply_pipe[0];
+ pfds.events = POLLIN | POLLERR | POLLHUP;
+ pfds.revents = 0;
+
+ res = poll(&pfds, 1, seconds*1000);
+ if (res == 1) {
+ if (pfds.revents & POLLIN) {
+ res = read(reply_pipe[0], tmpbuf, sizeof(tmpbuf));
+ if (res > 0) {
+ return 0;
+ }
+ } else {
+ printf("Error on pipe poll revent = 0x%x\n", pfds.revents);
+ errno = EIO;
+ }
+ }
+ if (res == 0) {
+ errno = ETIMEDOUT;
+ return -1;
+ }
+
+ return -1;
+}
+
+static void test(int transport)
+{
+ knet_handle_t knet_h[TESTNODES+1];
+ int logfds[2];
+ struct sockaddr_storage lo0, lo1;
+ struct sockaddr_storage ss1, ss2;
+ int res;
+ pthread_t recv_thread = 0;
+ int *thread_err;
+ int datafd;
+ int8_t channel;
+ int seconds = 90; // dynamic tests take longer than normal tests
+
+ if (is_memcheck() || is_helgrind()) {
+ printf("Test suite is running under valgrind, adjusting wait_for_host timeout\n");
+ seconds = seconds * 16;
+ }
+
+ memset(knet_h, 0, sizeof(knet_h));
+ memset(reply_pipe, 0, sizeof(reply_pipe));
+ memset(logfds, 0, sizeof(logfds));
+
+ FAIL_ON_ERR(pipe(reply_pipe));
+
+ // Initial setup gubbins
+ msgs_recvd = 0;
+ setup_logpipes(logfds);
+ start_logthread(logfds[1], stdout);
+ knet_handle_start_nodes(knet_h, TESTNODES, logfds, KNET_LOG_DEBUG);
+
+ FAIL_ON_ERR(knet_host_add(knet_h[2], 1));
+ FAIL_ON_ERR(knet_host_add(knet_h[1], 2));
+
+ FAIL_ON_ERR(knet_handle_enable_filter(knet_h[2], NULL, dhost_filter));
+
+ // Create the dynamic (receiving) link
+ FAIL_ON_ERR(dyn_knet_link_set_config(knet_h[1], 2, 0, transport, 0, AF_INET, 1, &lo0, NULL));
+
+ // Connect to the dynamic link
+ FAIL_ON_ERR(dyn_knet_link_set_config(knet_h[2], 1, 0, transport, 0, AF_INET, 0, &lo1, &lo0));
+
+ // All the rest of the setup gubbins
+ FAIL_ON_ERR(knet_handle_enable_sock_notify(knet_h[1], 0, &notify_fn));
+ FAIL_ON_ERR(knet_handle_enable_sock_notify(knet_h[2], 0, &notify_fn));
+
+ channel = datafd = 0;
+ FAIL_ON_ERR(knet_handle_add_datafd(knet_h[1], &datafd, &channel));
+ channel = datafd = 0;
+ FAIL_ON_ERR(knet_handle_add_datafd(knet_h[2], &datafd, &channel));
+
+ FAIL_ON_ERR(knet_link_set_enable(knet_h[1], 2, 0, 1));
+ FAIL_ON_ERR(knet_link_set_enable(knet_h[2], 1, 0, 1));
+
+ FAIL_ON_ERR(knet_handle_setfwd(knet_h[1], 1));
+ FAIL_ON_ERR(knet_handle_setfwd(knet_h[2], 1));
+
+ // Start receive thread
+ FAIL_ON_ERR(pthread_create(&recv_thread, NULL, recv_messages, (void *)knet_h[1]));
+
+ // Let everything settle down
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout));
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout));
+
+ /*
+ * TESTING STARTS HERE
+ * strings starting '1' should reach the receiving thread
+ * strings starting '0' should not
+ */
+
+ // No ACL
+ printf("Testing No ACL - this should get through\n");
+ FAIL_ON_ERR(knet_send_str(knet_h[2], "1No ACL - this should get through"));
+ FAIL_ON_ERR(wait_for_reply(seconds))
+
+ // Block traffic from this address.
+ memset(&ss1, 0, sizeof(ss1));
+ memset(&ss2, 0, sizeof(ss1));
+ knet_strtoaddr("127.0.0.1","0", &ss1, sizeof(ss1));
+ FAIL_ON_ERR(knet_link_add_acl(knet_h[1], 2, 0, &ss1, NULL, CHECK_TYPE_ADDRESS, CHECK_REJECT));
+ // Accept ACL for when we remove them
+ FAIL_ON_ERR(knet_link_add_acl(knet_h[1], 2, 0, &ss1, NULL, CHECK_TYPE_ADDRESS, CHECK_ACCEPT));
+
+ // This needs to go after the first ACLs are added
+ FAIL_ON_ERR(knet_handle_enable_access_lists(knet_h[1], 1));
+
+ printf("Testing Address blocked - this should NOT get through\n");
+ FAIL_ON_ERR(knet_send_str(knet_h[2], "0Address blocked - this should NOT get through"));
+
+ // Unblock and check again
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 0, seconds, logfds[0], stdout));
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 0, seconds, logfds[0], stdout));
+ FAIL_ON_ERR(knet_link_rm_acl(knet_h[1], 2, 0, &ss1, NULL, CHECK_TYPE_ADDRESS, CHECK_REJECT));
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout));
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout));
+
+ printf("Testing Address unblocked - this should get through\n");
+ FAIL_ON_ERR(knet_send_str(knet_h[2], "1Address unblocked - this should get through"));
+ FAIL_ON_ERR(wait_for_reply(seconds));
+
+ // Block traffic using a netmask
+ knet_strtoaddr("127.0.0.1","0", &ss1, sizeof(ss1));
+ knet_strtoaddr("255.0.0.1","0", &ss2, sizeof(ss2));
+ FAIL_ON_ERR(knet_link_insert_acl(knet_h[1], 2, 0, 0, &ss1, &ss2, CHECK_TYPE_MASK, CHECK_REJECT));
+
+ printf("Testing Netmask blocked - this should NOT get through\n");
+ FAIL_ON_ERR(knet_send_str(knet_h[2], "0Netmask blocked - this should NOT get through"));
+
+ // Unblock and check again
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 0, seconds, logfds[0], stdout));
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 0, seconds, logfds[0], stdout));
+ FAIL_ON_ERR(knet_link_rm_acl(knet_h[1], 2, 0, &ss1, &ss2, CHECK_TYPE_MASK, CHECK_REJECT));
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout));
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout));
+
+ printf("Testing Netmask unblocked - this should get through\n");
+ FAIL_ON_ERR(knet_send_str(knet_h[2], "1Netmask unblocked - this should get through"));
+ FAIL_ON_ERR(wait_for_reply(seconds));
+
+ // Block traffic from a range
+ knet_strtoaddr("127.0.0.0", "0", &ss1, sizeof(ss1));
+ knet_strtoaddr("127.0.0.9", "0", &ss2, sizeof(ss2));
+ FAIL_ON_ERR(knet_link_insert_acl(knet_h[1], 2, 0, 0, &ss1, &ss2, CHECK_TYPE_RANGE, CHECK_REJECT));
+
+ printf("Testing Range blocked - this should NOT get through\n");
+ FAIL_ON_ERR(knet_send_str(knet_h[2], "0Range blocked - this should NOT get through"));
+
+ // Unblock and check again
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 0, seconds, logfds[0], stdout));
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 0, seconds, logfds[0], stdout));
+ FAIL_ON_ERR(knet_link_rm_acl(knet_h[1], 2, 0, &ss1, &ss2, CHECK_TYPE_RANGE, CHECK_REJECT));
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout));
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout));
+
+ printf("Testing Range unblocked - this should get through\n");
+ FAIL_ON_ERR(knet_send_str(knet_h[2], "1Range unblocked - this should get through"));
+ FAIL_ON_ERR(wait_for_reply(seconds));
+
+ // Finish up - disable ACLS to make sure the QUIT message gets through
+ FAIL_ON_ERR(knet_handle_enable_access_lists(knet_h[1], 0));
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout));
+ FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout));
+
+ FAIL_ON_ERR(knet_send_str(knet_h[2], "QUIT"));
+
+ // Check return from the receiving thread
+ pthread_join(recv_thread, (void**)&thread_err);
+ if (*thread_err) {
+ printf("Thread returned %d\n", *thread_err);
+ exit(FAIL);
+ }
+
+ // Tidy Up
+ knet_handle_stop_nodes(knet_h, TESTNODES);
+
+ stop_logthread();
+ flush_logs(logfds[0], stdout);
+ close_logpipes(logfds);
+ close(reply_pipe[0]);
+ close(reply_pipe[1]);
+
+ if (msgs_recvd != CORRECT_NUM_MSGS) {
+ printf("*** FAIL Recv thread got %d messages, expected %d\n", msgs_recvd, CORRECT_NUM_MSGS);
+ exit(FAIL);
+ }
+}
+
+int main(int argc, char *argv[])
+{
+ printf("Testing with UDP\n");
+ test(KNET_TRANSPORT_UDP);
+
+#ifdef HAVE_NETINET_SCTP_H
+ printf("Testing with SCTP currently disabled\n");
+ //test(KNET_TRANSPORT_SCTP);
+#endif
+
+ return PASS;
+}
diff --git a/libknet/tests/test-common.c b/libknet/tests/test-common.c
index e55f5f73..2729f551 100644
--- a/libknet/tests/test-common.c
+++ b/libknet/tests/test-common.c
@@ -1,878 +1,889 @@
/*
* Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved.
*
* Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
*
* This software licensed under GPL-2.0+
*/
#include "config.h"
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <pthread.h>
#include <dirent.h>
#include <sys/select.h>
#include "libknet.h"
#include "test-common.h"
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
static int log_init = 0;
static pthread_mutex_t log_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_t log_thread;
static int log_thread_init = 0;
static int log_fds[2];
struct log_thread_data {
int logfd;
FILE *std;
};
static struct log_thread_data data;
static char plugin_path[PATH_MAX];
static int _read_pipe(int fd, char **file, size_t *length)
{
char buf[4096];
int n;
int done = 0;
*file = NULL;
*length = 0;
memset(buf, 0, sizeof(buf));
while (!done) {
n = read(fd, buf, sizeof(buf));
if (n < 0) {
if (errno == EINTR)
continue;
if (*file)
free(*file);
return n;
}
if (n == 0 && (!*length))
return 0;
if (n == 0)
done = 1;
if (*file)
*file = realloc(*file, (*length) + n + done);
else
*file = malloc(n + done);
if (!*file)
return -1;
memmove((*file) + (*length), buf, n);
*length += (done + n);
}
/* Null terminator */
(*file)[(*length) - 1] = 0;
return 0;
}
int execute_shell(const char *command, char **error_string)
{
pid_t pid;
int status, err = 0;
int fd[2];
size_t size = 0;
if ((command == NULL) || (!error_string)) {
errno = EINVAL;
return FAIL;
}
*error_string = NULL;
err = pipe(fd);
if (err)
goto out_clean;
pid = fork();
if (pid < 0) {
err = pid;
goto out_clean;
}
if (pid) { /* parent */
close(fd[1]);
err = _read_pipe(fd[0], error_string, &size);
if (err)
goto out_clean0;
waitpid(pid, &status, 0);
if (!WIFEXITED(status)) {
err = -1;
goto out_clean0;
}
if (WIFEXITED(status) && WEXITSTATUS(status) != 0) {
err = WEXITSTATUS(status);
goto out_clean0;
}
goto out_clean0;
} else { /* child */
close(0);
close(1);
close(2);
close(fd[0]);
dup2(fd[1], 1);
dup2(fd[1], 2);
close(fd[1]);
execlp("/bin/sh", "/bin/sh", "-c", command, NULL);
exit(FAIL);
}
out_clean:
close(fd[1]);
out_clean0:
close(fd[0]);
return err;
}
int is_memcheck(void)
{
char *val;
val = getenv("KNETMEMCHECK");
if (val) {
if (!strncmp(val, "yes", 3)) {
return 1;
}
}
return 0;
}
int is_helgrind(void)
{
char *val;
val = getenv("KNETHELGRIND");
if (val) {
if (!strncmp(val, "yes", 3)) {
return 1;
}
}
return 0;
}
void set_scheduler(int policy)
{
struct sched_param sched_param;
int err;
err = sched_get_priority_max(policy);
if (err < 0) {
printf("Could not get maximum scheduler priority\n");
exit(FAIL);
}
sched_param.sched_priority = err;
err = sched_setscheduler(0, policy, &sched_param);
if (err < 0) {
printf("Could not set priority\n");
exit(FAIL);
}
return;
}
int setup_logpipes(int *logfds)
{
if (pipe2(logfds, O_CLOEXEC | O_NONBLOCK) < 0) {
printf("Unable to setup logging pipe\n");
exit(FAIL);
}
return PASS;
}
void close_logpipes(int *logfds)
{
close(logfds[0]);
logfds[0] = 0;
close(logfds[1]);
logfds[1] = 0;
}
void flush_logs(int logfd, FILE *std)
{
struct knet_log_msg msg;
int len;
while (1) {
len = read(logfd, &msg, sizeof(msg));
if (len != sizeof(msg)) {
/*
* clear errno to avoid incorrect propagation
*/
errno = 0;
return;
}
msg.msg[sizeof(msg.msg) - 1] = 0;
fprintf(std, "[knet]: [%s] %s: %.*s\n",
knet_log_get_loglevel_name(msg.msglevel),
knet_log_get_subsystem_name(msg.subsystem),
KNET_MAX_LOG_MSG_SIZE, msg.msg);
}
}
static void *_logthread(void *args)
{
while (1) {
int num;
struct timeval tv = { 60, 0 };
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(data.logfd, &rfds);
num = select(FD_SETSIZE, &rfds, NULL, NULL, &tv);
if (num < 0) {
fprintf(data.std, "Unable select over logfd!\nHALTING LOGTHREAD!\n");
return NULL;
}
if (num == 0) {
fprintf(data.std, "[knet]: No logs in the last 60 seconds\n");
continue;
}
if (FD_ISSET(data.logfd, &rfds)) {
flush_logs(data.logfd, data.std);
}
}
}
int start_logthread(int logfd, FILE *std)
{
int savederrno = 0;
savederrno = pthread_mutex_lock(&log_thread_mutex);
if (savederrno) {
printf("Unable to get log_thread mutex lock\n");
return -1;
}
if (!log_thread_init) {
data.logfd = logfd;
data.std = std;
savederrno = pthread_create(&log_thread, 0, _logthread, NULL);
if (savederrno) {
printf("Unable to start logging thread: %s\n", strerror(savederrno));
pthread_mutex_unlock(&log_thread_mutex);
return -1;
}
log_thread_init = 1;
}
pthread_mutex_unlock(&log_thread_mutex);
return 0;
}
int stop_logthread(void)
{
int savederrno = 0;
void *retval;
savederrno = pthread_mutex_lock(&log_thread_mutex);
if (savederrno) {
printf("Unable to get log_thread mutex lock\n");
return -1;
}
if (log_thread_init) {
pthread_cancel(log_thread);
pthread_join(log_thread, &retval);
log_thread_init = 0;
}
pthread_mutex_unlock(&log_thread_mutex);
return 0;
}
static void stop_logging(void)
{
stop_logthread();
flush_logs(log_fds[0], stdout);
close_logpipes(log_fds);
}
int start_logging(FILE *std)
{
int savederrno = 0;
savederrno = pthread_mutex_lock(&log_mutex);
if (savederrno) {
printf("Unable to get log_mutex lock\n");
return -1;
}
if (!log_init) {
setup_logpipes(log_fds);
if (atexit(&stop_logging) != 0) {
printf("Unable to register atexit handler to stop logging: %s\n",
strerror(errno));
exit(FAIL);
}
if (start_logthread(log_fds[0], std) < 0) {
exit(FAIL);
}
log_init = 1;
}
pthread_mutex_unlock(&log_mutex);
return log_fds[1];
}
static int dir_filter(const struct dirent *dname)
{
if ( (strcmp(dname->d_name + strlen(dname->d_name)-3, ".so") == 0) &&
((strncmp(dname->d_name,"crypto", 6) == 0) ||
(strncmp(dname->d_name,"compress", 8) == 0))) {
return 1;
}
return 0;
}
/* Make sure the proposed plugin path has at least 1 of each plugin available
- just as a sanity check really */
static int contains_plugins(char *path)
{
struct dirent **namelist;
int n,i;
size_t j;
struct knet_compress_info compress_list[256];
struct knet_crypto_info crypto_list[256];
size_t num_compress, num_crypto;
size_t compress_found = 0;
size_t crypto_found = 0;
if (knet_get_compress_list(compress_list, &num_compress) == -1) {
return 0;
}
if (knet_get_crypto_list(crypto_list, &num_crypto) == -1) {
return 0;
}
n = scandir(path, &namelist, dir_filter, alphasort);
if (n == -1) {
return 0;
}
/* Look for plugins in the list */
for (i=0; i<n; i++) {
for (j=0; j<num_crypto; j++) {
if (strlen(namelist[i]->d_name) >= 7 &&
strncmp(crypto_list[j].name, namelist[i]->d_name+7,
strlen(crypto_list[j].name)) == 0) {
crypto_found++;
}
}
for (j=0; j<num_compress; j++) {
if (strlen(namelist[i]->d_name) >= 9 &&
strncmp(compress_list[j].name, namelist[i]->d_name+9,
strlen(compress_list[j].name)) == 0) {
compress_found++;
}
}
free(namelist[i]);
}
free(namelist);
/* If at least one plugin was found (or none were built) */
if ((crypto_found || num_crypto == 0) &&
(compress_found || num_compress == 0)) {
return 1;
} else {
return 0;
}
}
/* libtool sets LD_LIBRARY_PATH to the build tree when running test in-tree */
static char *find_plugins_path(void)
{
char *ld_libs_env = getenv("LD_LIBRARY_PATH");
if (ld_libs_env) {
char *ld_libs = strdup(ld_libs_env);
char *str = strtok(ld_libs, ":");
while (str) {
if (contains_plugins(str)) {
strncpy(plugin_path, str, sizeof(plugin_path)-1);
free(ld_libs);
printf("Using plugins from %s\n", plugin_path);
return plugin_path;
}
str = strtok(NULL, ":");
}
free(ld_libs);
}
return NULL;
}
knet_handle_t knet_handle_start(int logfds[2], uint8_t log_level)
{
knet_handle_t knet_h = knet_handle_new_ex(1, logfds[1], log_level, 0);
char *plugins_path;
if (knet_h) {
printf("knet_handle_new at %p\n", knet_h);
plugins_path = find_plugins_path();
/* Use plugins from the build tree */
if (plugins_path) {
knet_h->plugin_path = plugins_path;
}
return knet_h;
} else {
printf("knet_handle_new failed: %s\n", strerror(errno));
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}
}
int knet_handle_stop(knet_handle_t knet_h)
{
size_t i, j;
knet_node_id_t host_ids[KNET_MAX_HOST];
uint8_t link_ids[KNET_MAX_LINK];
size_t host_ids_entries = 0, link_ids_entries = 0;
unsigned int enabled;
if (!knet_h) {
errno = EINVAL;
return -1;
}
if (knet_handle_setfwd(knet_h, 0) < 0) {
printf("knet_handle_setfwd failed: %s\n", strerror(errno));
return -1;
}
if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) {
printf("knet_host_get_host_list failed: %s\n", strerror(errno));
return -1;
}
for (i = 0; i < host_ids_entries; i++) {
if (knet_link_get_link_list(knet_h, host_ids[i], link_ids, &link_ids_entries)) {
printf("knet_link_get_link_list failed: %s\n", strerror(errno));
return -1;
}
for (j = 0; j < link_ids_entries; j++) {
if (knet_link_get_enable(knet_h, host_ids[i], link_ids[j], &enabled)) {
printf("knet_link_get_enable failed: %s\n", strerror(errno));
return -1;
}
if (enabled) {
if (knet_link_set_enable(knet_h, host_ids[i], j, 0)) {
printf("knet_link_set_enable failed: %s\n", strerror(errno));
return -1;
}
}
printf("clearing config for: %p host: %u link: %zu\n", knet_h, host_ids[i], j);
knet_link_clear_config(knet_h, host_ids[i], j);
}
if (knet_host_remove(knet_h, host_ids[i]) < 0) {
printf("knet_host_remove failed: %s\n", strerror(errno));
return -1;
}
}
if (knet_handle_free(knet_h)) {
printf("knet_handle_free failed: %s\n", strerror(errno));
return -1;
}
return 0;
}
static int _make_local_sockaddr(struct sockaddr_storage *lo, int offset, int family)
{
in_port_t port;
char portstr[32];
if (offset < 0) {
/*
* api_knet_link_set_config needs to access the API directly, but
* it does not send any traffic, so it´s safe to ask the kernel
* for a random port.
*/
port = 0;
} else {
/* Use the pid if we can. but makes sure its in a sensible range */
port = (getpid() + offset) % (65536-1024) + 1024;
}
sprintf(portstr, "%u", port);
memset(lo, 0, sizeof(struct sockaddr_storage));
printf("Using port %u\n", port);
if (family == AF_INET6) {
return knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage));
}
return knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage));
}
int make_local_sockaddr(struct sockaddr_storage *lo, int offset)
{
return _make_local_sockaddr(lo, offset, AF_INET);
}
int make_local_sockaddr6(struct sockaddr_storage *lo, int offset)
{
return _make_local_sockaddr(lo, offset, AF_INET6);
}
int _knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
uint8_t transport, uint64_t flags, int family, int dynamic,
struct sockaddr_storage *lo)
{
int err = 0, savederrno = 0;
uint32_t port;
char portstr[32];
for (port = 1025; port < 65536; port++) {
sprintf(portstr, "%u", port);
memset(lo, 0, sizeof(struct sockaddr_storage));
if (family == AF_INET6) {
err = knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage));
} else {
err = knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage));
}
if (err < 0) {
printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno));
goto out;
}
errno = 0;
if (dynamic) {
err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, NULL, flags);
} else {
err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, lo, flags);
}
savederrno = errno;
if ((err < 0) && (savederrno != EADDRINUSE)) {
printf("Unable to configure link: %s\n", strerror(savederrno));
goto out;
}
if (!err) {
printf("Using port %u\n", port);
goto out;
}
}
if (err) {
printf("No more ports available\n");
}
out:
errno = savederrno;
return err;
}
void test_sleep(knet_handle_t knet_h, int seconds)
{
if (is_memcheck() || is_helgrind()) {
printf("Test suite is running under valgrind, adjusting sleep timers\n");
seconds = seconds * 16;
}
sleep(seconds);
}
int wait_for_packet(knet_handle_t knet_h, int seconds, int datafd, int logfd, FILE *std)
{
fd_set rfds;
struct timeval tv;
int err = 0, i = 0;
if (is_memcheck() || is_helgrind()) {
printf("Test suite is running under valgrind, adjusting wait_for_packet timeout\n");
seconds = seconds * 16;
}
try_again:
FD_ZERO(&rfds);
FD_SET(datafd, &rfds);
tv.tv_sec = 1;
tv.tv_usec = 0;
err = select(datafd+1, &rfds, NULL, NULL, &tv);
/*
* on slow arches the first call to select can return 0.
* pick an arbitrary 10 times loop (multiplied by waiting seconds)
* before failing.
*/
if ((!err) && (i < seconds)) {
flush_logs(logfd, std);
i++;
goto try_again;
}
if ((err > 0) && (FD_ISSET(datafd, &rfds))) {
return 0;
}
errno = ETIMEDOUT;
return -1;
}
/*
* functional tests helpers
*/
void knet_handle_start_nodes(knet_handle_t knet_h[], uint8_t numnodes, int logfds[2], uint8_t log_level)
{
uint8_t i;
char *plugins_path = find_plugins_path();
for (i = 1; i <= numnodes; i++) {
knet_h[i] = knet_handle_new_ex(i, logfds[1], log_level, 0);
if (!knet_h[i]) {
printf("failed to create handle: %s\n", strerror(errno));
break;
} else {
printf("knet_h[%u] at %p\n", i, knet_h[i]);
}
/* Use plugins from the build tree */
if (plugins_path) {
knet_h[i]->plugin_path = plugins_path;
}
}
if (i < numnodes) {
knet_handle_stop_nodes(knet_h, i);
exit(FAIL);
}
return;
}
void knet_handle_stop_nodes(knet_handle_t knet_h[], uint8_t numnodes)
{
uint8_t i;
for (i = 1; i <= numnodes; i++) {
- printf("stopping handle %u at %p\n", i, knet_h[i]);
- knet_handle_stop(knet_h[i]);
+ if (knet_h[i]) {
+ printf("stopping handle %u at %p\n", i, knet_h[i]);
+ knet_handle_stop(knet_h[i]);
+ }
}
return;
}
void knet_handle_join_nodes(knet_handle_t knet_h[], uint8_t numnodes, uint8_t numlinks, int family, uint8_t transport)
{
uint8_t i, x, j;
struct sockaddr_storage src, dst;
int offset = 0;
int res;
for (i = 1; i <= numnodes; i++) {
for (j = 1; j <= numnodes; j++) {
/*
* don´t connect to itself
*/
if (j == i) {
continue;
}
printf("host %u adding host: %u\n", i, j);
if (knet_host_add(knet_h[i], j) < 0) {
printf("Unable to add host: %s\n", strerror(errno));
knet_handle_stop_nodes(knet_h, numnodes);
exit(FAIL);
}
for (x = 0; x < numlinks; x++) {
res = -1;
offset = 0;
while (i + x + offset++ < 65535 && res != 0) {
if (_make_local_sockaddr(&src, i + x + offset, family) < 0) {
printf("Unable to convert src to sockaddr: %s\n", strerror(errno));
knet_handle_stop_nodes(knet_h, numnodes);
exit(FAIL);
}
if (_make_local_sockaddr(&dst, j + x + offset, family) < 0) {
printf("Unable to convert dst to sockaddr: %s\n", strerror(errno));
knet_handle_stop_nodes(knet_h, numnodes);
exit(FAIL);
}
res = knet_link_set_config(knet_h[i], j, x, transport, &src, &dst, 0);
}
printf("joining node %u with node %u via link %u src offset: %u dst offset: %u\n", i, j, x, i+x, j+x);
if (knet_link_set_enable(knet_h[i], j, x, 1) < 0) {
printf("unable to enable link: %s\n", strerror(errno));
knet_handle_stop_nodes(knet_h, numnodes);
exit(FAIL);
}
}
}
}
for (i = 1; i <= numnodes; i++) {
wait_for_nodes_state(knet_h[i], numnodes, 1, 600, knet_h[1]->logfd, stdout);
}
return;
}
static int target=0;
static pthread_mutex_t wait_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t wait_cond = PTHREAD_COND_INITIALIZER;
static int count_nodes(knet_handle_t knet_h)
{
int nodes = 0;
int i;
for (i=0; i< KNET_MAX_HOST; i++) {
if (knet_h->host_index[i] && knet_h->host_index[i]->status.reachable == 1) {
nodes++;
}
}
return nodes;
}
static void nodes_notify_callback(void *private_data,
knet_node_id_t host_id,
uint8_t reachable, uint8_t remote, uint8_t external)
{
knet_handle_t knet_h = (knet_handle_t) private_data;
int nodes;
nodes = count_nodes(knet_h);
if (nodes == target) {
pthread_cond_signal(&wait_cond);
}
}
static void host_notify_callback(void *private_data,
knet_node_id_t host_id,
uint8_t reachable, uint8_t remote, uint8_t external)
{
knet_handle_t knet_h = (knet_handle_t) private_data;
if (knet_h->host_index[host_id]->status.reachable == 1) {
pthread_cond_signal(&wait_cond);
}
}
/* Wait for a cluster of 'numnodes' to come up/go down */
int wait_for_nodes_state(knet_handle_t knet_h, size_t numnodes,
uint8_t state, uint32_t timeout,
int logfd, FILE *std)
{
struct timespec ts;
- int res;
+ int res, savederrno = 0;
if (state) {
target = numnodes-1; /* exclude us */
} else {
target = 0; /* Wait for all to go down */
}
/* Set this before checking existing status or there's a race condition */
knet_host_enable_status_change_notify(knet_h,
(void *)(long)knet_h,
nodes_notify_callback);
/* Check we haven't already got all the nodes in the correct state */
if (count_nodes(knet_h) == target) {
fprintf(stderr, "target already reached\n");
knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
flush_logs(logfd, std);
return 0;
}
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += timeout;
if (pthread_mutex_lock(&wait_mutex)) {
fprintf(stderr, "unable to get nodewait mutex: %s\n", strerror(errno));
return -1;
}
+
res = pthread_cond_timedwait(&wait_cond, &wait_mutex, &ts);
- if (res == -1 && errno == ETIMEDOUT) {
+ if (res != 0 && res != ETIMEDOUT) {
+ fprintf(stderr, "pthread_cond_timedwait fatal error\n");
+ errno = res;
+ return -1;
+ }
+ if (res == ETIMEDOUT) {
fprintf(stderr, "Timed-out\n");
+ savederrno = ETIMEDOUT;
+ res = -1;
}
pthread_mutex_unlock(&wait_mutex);
knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
flush_logs(logfd, std);
+ errno = savederrno;
return res;
}
/* Wait for a single node to come up */
int wait_for_host(knet_handle_t knet_h, uint16_t host_id, int seconds, int logfd, FILE *std)
{
int res;
struct timespec ts;
if (is_memcheck() || is_helgrind()) {
printf("Test suite is running under valgrind, adjusting wait_for_host timeout\n");
seconds = seconds * 16;
}
/* Set this before checking existing status or there's a race condition */
knet_host_enable_status_change_notify(knet_h,
(void *)(long)knet_h,
host_notify_callback);
/* Check it's not already reachable */
if (knet_h->host_index[host_id]->status.reachable == 1) {
knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
flush_logs(logfd, std);
return 0;
}
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += seconds;
if (pthread_mutex_lock(&wait_mutex)) {
fprintf(stderr, "unable to get nodewait mutex: %s\n", strerror(errno));
return -1;
}
res = pthread_cond_timedwait(&wait_cond, &wait_mutex, &ts);
if (res == -1 && errno == ETIMEDOUT) {
fprintf(stderr, "Timed-out\n");
knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
pthread_mutex_unlock(&wait_mutex);
flush_logs(logfd, std);
return -1;
}
pthread_mutex_unlock(&wait_mutex);
knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
/* Still wait for it to settle */
flush_logs(logfd, std);
test_sleep(knet_h, 1);
return 0;
}
diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c
index d671a845..213860fd 100644
--- a/libknet/threads_rx.c
+++ b/libknet/threads_rx.c
@@ -1,1047 +1,1058 @@
/*
* Copyright (C) 2012-2021 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/uio.h>
#include <pthread.h>
#include "compat.h"
#include "compress.h"
#include "crypto.h"
#include "host.h"
#include "links.h"
#include "links_acl.h"
#include "logging.h"
#include "transports.h"
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_rx.h"
#include "netutils.h"
/*
* RECV
*/
/*
* return 1 if a > b
* return -1 if b > a
* return 0 if they are equal
*/
static inline int timecmp(struct timespec a, struct timespec b)
{
if (a.tv_sec != b.tv_sec) {
if (a.tv_sec > b.tv_sec) {
return 1;
} else {
return -1;
}
} else {
if (a.tv_nsec > b.tv_nsec) {
return 1;
} else if (a.tv_nsec < b.tv_nsec) {
return -1;
} else {
return 0;
}
}
}
/*
* this functions needs to return an index (0 to 7)
* to a knet_host_defrag_buf. (-1 on errors)
*/
static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
{
struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
int i, oldest;
/*
* check if there is a buffer already in use handling the same seq_num
*/
for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
if (src_host->defrag_buf[i].in_use) {
if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
return i;
}
}
}
/*
* If there is no buffer that's handling the current seq_num
* either it's new or it's been reclaimed already.
* check if it's been reclaimed/seen before using the defrag circular
* buffer. If the pckt has been seen before, the buffer expired (ETIME)
* and there is no point to try to defrag it again.
*/
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
errno = ETIME;
return -1;
}
/*
* register the pckt as seen
*/
_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
/*
* see if there is a free buffer
*/
for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
if (!src_host->defrag_buf[i].in_use) {
return i;
}
}
/*
* at this point, there are no free buffers, the pckt is new
* and we need to reclaim a buffer, and we will take the one
* with the oldest timestamp. It's as good as any.
*/
oldest = 0;
for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
oldest = i;
}
}
src_host->defrag_buf[oldest].in_use = 0;
return oldest;
}
static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
{
struct knet_host_defrag_buf *defrag_buf;
int defrag_buf_idx;
defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
if (defrag_buf_idx < 0) {
return 1;
}
defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
/*
* if the buf is not is use, then make sure it's clean
*/
if (!defrag_buf->in_use) {
memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
defrag_buf->in_use = 1;
defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
}
/*
* update timestamp on the buffer
*/
clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
/*
* check if we already received this fragment
*/
if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
/*
* if we have received this fragment and we didn't clear the buffer
* it means that we don't have all fragments yet
*/
return 1;
}
/*
* we need to handle the last packet with gloves due to its different size
*/
if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
defrag_buf->last_frag_size = *len;
/*
* in the event when the last packet arrives first,
* we still don't know the offset vs the other fragments (based on MTU),
* so we store the fragment at the end of the buffer where it's safe
* and take a copy of the len so that we can restore its offset later.
* remember we can't use the local MTU for this calculation because pMTU
* can be asymettric between the same hosts.
*/
if (!defrag_buf->frag_size) {
defrag_buf->last_first = 1;
memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
inbuf->khp_data_userdata,
*len);
}
} else {
defrag_buf->frag_size = *len;
}
if (defrag_buf->frag_size) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
inbuf->khp_data_userdata, *len);
}
defrag_buf->frag_recv++;
defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
/*
* check if we received all the fragments
*/
if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
/*
* special case the last pckt
*/
if (defrag_buf->last_first) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
defrag_buf->last_frag_size);
}
/*
* recalculate packet lenght
*/
*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
/*
* copy the pckt back in the user data
*/
memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
/*
* free this buffer
*/
defrag_buf->in_use = 0;
return 0;
}
return 1;
}
+/*
+ * processing incoming packets vs access lists
+ */
+static int _check_rx_acl(knet_handle_t knet_h, struct knet_link *src_link, const struct knet_mmsghdr *msg)
+{
+ if (knet_h->use_access_lists) {
+ if (!check_validate(knet_h, src_link, msg->msg_hdr.msg_name)) {
+ char src_ipaddr[KNET_MAX_HOST_LEN];
+ char src_port[KNET_MAX_PORT_LEN];
+
+ memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
+ memset(src_port, 0, KNET_MAX_PORT_LEN);
+ if (knet_addrtostr(msg->msg_hdr.msg_name, sockaddr_len(msg->msg_hdr.msg_name),
+ src_ipaddr, KNET_MAX_HOST_LEN,
+ src_port, KNET_MAX_PORT_LEN) < 0) {
+
+ log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port");
+ } else {
+ log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port);
+ }
+ return 0;
+ }
+ }
+ return 1;
+}
+
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
{
int err = 0, savederrno = 0, stats_err = 0;
ssize_t outlen;
struct knet_host *src_host;
struct knet_link *src_link;
unsigned long long latency_last;
knet_node_id_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
uint64_t decrypt_time = 0;
struct timespec recvtime;
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
struct iovec iov_out[1];
int8_t channel;
seq_num_t recv_seq_num;
int wipe_bufs = 0;
int try_decrypt = 0, decrypted = 0, i, found_link = 0;
for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
if (knet_h->crypto_instance[i]) {
try_decrypt = 1;
break;
}
}
if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) {
log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!");
return;
}
if (try_decrypt) {
struct timespec start_time;
struct timespec end_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
if (crypto_authenticate_and_decrypt(knet_h,
(unsigned char *)inbuf,
len,
knet_h->recv_from_links_buf_decrypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) {
return;
}
log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data");
} else {
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &decrypt_time);
len = outlen;
inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
decrypted = 1;
}
}
if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
return;
}
if (inbuf->kh_version != KNET_HEADER_VERSION) {
log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
return;
}
inbuf->kh_node = ntohs(inbuf->kh_node);
src_host = knet_h->host_index[inbuf->kh_node];
if (src_host == NULL) { /* host not found */
log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
return;
}
if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
/* be aware this works only for PING / PONG and PMTUd packets! */
src_link = src_host->link +
(inbuf->khp_ping_link % KNET_MAX_LINK);
+ if (!_check_rx_acl(knet_h, src_link, msg)) {
+ return;
+ }
if (src_link->dynamic == KNET_LINK_DYNIP) {
if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) {
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
src_host->host_id, src_link->link_id);
memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage));
if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
} else {
log_info(knet_h, KNET_SUB_RX,
"host: %u link: %u new connection established from: %s %s",
src_host->host_id, src_link->link_id,
src_link->status.dst_ipaddr, src_link->status.dst_port);
}
}
/*
* transport has already accepted the connection here
* otherwise we would not be receiving packets
*/
transport_link_dyn_connect(knet_h, sockfd, src_link);
}
} else { /* data packet */
for (i = 0; i < KNET_MAX_LINK; i++) {
src_link = &src_host->link[i];
if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) {
found_link = 1;
break;
}
}
- if (!found_link) {
+ if (found_link) {
+ /*
+ * this check is currently redundant.. Keep it here for now
+ */
+ if (!_check_rx_acl(knet_h, src_link, msg)) {
+ return;
+ }
+ } else {
log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet.");
return;
}
}
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
if (stats_err) {
log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s",
src_host->host_id, src_link->link_id, strerror(savederrno));
return;
}
switch (inbuf->kh_type) {
case KNET_HEADER_TYPE_DATA:
/* data stats at the top for consistency with TX */
src_link->status.stats.rx_data_packets++;
src_link->status.stats.rx_data_bytes += len;
if (decrypted) {
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
/* Only update the crypto overhead for data packets. Mainly to be
consistent with TX */
if (decrypt_time < knet_h->stats.rx_crypt_time_min) {
knet_h->stats.rx_crypt_time_min = decrypt_time;
}
if (decrypt_time > knet_h->stats.rx_crypt_time_max) {
knet_h->stats.rx_crypt_time_max = decrypt_time;
}
knet_h->stats.rx_crypt_time_ave =
(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
decrypt_time) / (knet_h->stats.rx_crypt_packets+1);
knet_h->stats.rx_crypt_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
if (!src_host->status.reachable) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id);
return;
}
inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
channel = inbuf->khp_data_channel;
src_host->got_data = 1;
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
}
return;
}
if (inbuf->khp_data_frag_num > 1) {
/*
* len as received from the socket also includes extra stuff
* that the defrag code doesn't care about. So strip it
* here and readd only for repadding once we are done
* defragging
*/
len = len - KNET_HEADER_DATA_SIZE;
if (pckt_defrag(knet_h, inbuf, &len)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
len = len + KNET_HEADER_DATA_SIZE;
}
if (inbuf->khp_data_compress) {
ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
err = decompress(knet_h, inbuf->khp_data_compress,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
knet_h->recv_from_links_buf_decompress,
&decmp_outlen);
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
clock_gettime(CLOCK_MONOTONIC, &end_time);
timespec_diff(start_time, end_time, &compress_time);
if (!err) {
/* Collect stats */
if (compress_time < knet_h->stats.rx_compress_time_min) {
knet_h->stats.rx_compress_time_min = compress_time;
}
if (compress_time > knet_h->stats.rx_compress_time_max) {
knet_h->stats.rx_compress_time_max = compress_time;
}
knet_h->stats.rx_compress_time_ave =
(knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets +
compress_time) / (knet_h->stats.rx_compressed_packets+1);
knet_h->stats.rx_compressed_packets++;
knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE;
memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
len = decmp_outlen + KNET_HEADER_DATA_SIZE;
} else {
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
err, strerror(errno));
return;
}
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
if (knet_h->enabled != 1) /* data forward is disabled */
break;
if (knet_h->dst_host_filter_fn) {
size_t host_idx;
int found = 0;
bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
&channel,
dst_host_ids,
&dst_host_ids_entries);
if (bcast < 0) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
return;
}
if ((!bcast) && (!dst_host_ids_entries)) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
return;
}
/* check if we are dst for this packet */
if (!bcast) {
if (dst_host_ids_entries > KNET_MAX_HOST) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
return;
}
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
if (dst_host_ids[host_idx] == knet_h->host_id) {
found = 1;
break;
}
}
if (!found) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
return;
}
}
}
if (!knet_h->sockfd[channel].in_use) {
pthread_mutex_unlock(&src_link->link_stats_mutex);
log_debug(knet_h, KNET_SUB_RX,
"received packet for channel %d but there is no local sock connected",
channel);
return;
}
outlen = 0;
memset(iov_out, 0, sizeof(iov_out));
retry:
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
log_debug(knet_h, KNET_SUB_RX,
"Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
iov_out[0].iov_len, outlen);
goto retry;
}
if (outlen <= 0) {
knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
knet_h->sockfd[channel].sockfd[0],
channel,
KNET_NOTIFY_RX,
outlen,
errno);
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
if ((size_t)outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
}
break;
case KNET_HEADER_TYPE_PING:
outlen = KNET_HEADER_PING_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PONG;
inbuf->kh_node = htons(knet_h->host_id);
recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
src_link->status.stats.rx_ping_packets++;
src_link->status.stats.rx_ping_bytes += len;
wipe_bufs = 0;
if (!inbuf->khp_ping_timed) {
/*
* we might be receiving this message from all links, but we want
* to process it only the first time
*/
if (recv_seq_num != src_host->untimed_rx_seq_num) {
/*
* cache the untimed seq num
*/
src_host->untimed_rx_seq_num = recv_seq_num;
/*
* if the host has received data in between
* untimed ping, then we don't need to wipe the bufs
*/
if (src_host->got_data) {
src_host->got_data = 0;
wipe_bufs = 0;
} else {
wipe_bufs = 1;
}
}
_seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
} else {
/*
* pings always arrives in bursts over all the link
* catch the first of them to cache the seq num and
* avoid duplicate processing
*/
if (recv_seq_num != src_host->timed_rx_seq_num) {
src_host->timed_rx_seq_num = recv_seq_num;
if (recv_seq_num == 0) {
_seq_num_lookup(src_host, recv_seq_num, 0, 1);
}
}
}
if (knet_h->crypto_in_use_config) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
outlen,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
knet_h->stats_extra.tx_crypt_pong_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
retry_pong:
if (src_link->transport_connected) {
if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr, knet_h->knet_transport_fd_tracker[src_link->outsock].sockaddr_len);
} else {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pong_errors++;
break;
case 0: /* ignore error and continue */
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pong_retries++;
goto retry_pong;
break;
}
}
src_link->status.stats.tx_pong_packets++;
src_link->status.stats.tx_pong_bytes += outlen;
}
break;
case KNET_HEADER_TYPE_PONG:
src_link->status.stats.rx_pong_packets++;
src_link->status.stats.rx_pong_bytes += len;
clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
timespec_diff(recvtime,
src_link->status.pong_last, &latency_last);
if ((latency_last / 1000llu) > src_link->pong_timeout) {
log_debug(knet_h, KNET_SUB_RX,
"Incoming pong packet from host: %u link: %u has higher latency than pong_timeout. Discarding",
src_host->host_id, src_link->link_id);
} else {
/*
* in words : ('previous mean' * '(count -1)') + 'new value') / 'count'
*/
src_link->latency_cur_samples++;
/*
* limit to max_samples (precision)
*/
if (src_link->latency_cur_samples >= src_link->latency_max_samples) {
src_link->latency_cur_samples = src_link->latency_max_samples;
}
src_link->status.latency =
(((src_link->status.latency * (src_link->latency_cur_samples - 1)) + (latency_last / 1000llu)) / src_link->latency_cur_samples);
if (src_link->status.latency < src_link->pong_timeout_adj) {
if (!src_link->status.connected) {
if (src_link->received_pong >= src_link->pong_count) {
log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
src_host->host_id, src_link->link_id);
_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0);
} else {
src_link->received_pong++;
log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
src_host->host_id, src_link->link_id, src_link->received_pong);
}
}
}
/* Calculate latency stats */
if (src_link->status.latency > src_link->status.stats.latency_max) {
src_link->status.stats.latency_max = src_link->status.latency;
}
if (src_link->status.latency < src_link->status.stats.latency_min) {
src_link->status.stats.latency_min = src_link->status.latency;
}
/*
* those 2 lines below make all latency average calculations consistent and capped to
* link precision. In future we will kill the one above to keep only this one in
* the stats structure, but for now we leave it around to avoid API/ABI
* breakage as we backport the fixes to stable
*/
src_link->status.stats.latency_ave = src_link->status.latency;
src_link->status.stats.latency_samples = src_link->latency_cur_samples;
}
break;
case KNET_HEADER_TYPE_PMTUD:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
outlen = KNET_HEADER_PMTUD_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
inbuf->kh_node = htons(knet_h->host_id);
if (knet_h->crypto_in_use_config) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
outlen,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
knet_h->stats_extra.tx_crypt_pmtu_reply_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}
/* Unlock so we don't deadlock with tx_mutex */
pthread_mutex_unlock(&src_link->link_stats_mutex);
savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
goto out_pmtud;
}
retry_pmtud:
if (src_link->transport_connected) {
if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr, knet_h->knet_transport_fd_tracker[src_link->outsock].sockaddr_len);
} else {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);
src_link->status.stats.tx_pmtu_errors++;
break;
case 0: /* ignore error and continue */
src_link->status.stats.tx_pmtu_errors++;
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pmtu_retries++;
pthread_mutex_unlock(&src_link->link_stats_mutex);
goto retry_pmtud;
break;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
}
pthread_mutex_unlock(&knet_h->tx_mutex);
out_pmtud:
return; /* Don't need to unlock link_stats_mutex */
case KNET_HEADER_TYPE_PMTUD_REPLY:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
/* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
pthread_mutex_unlock(&src_link->link_stats_mutex);
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
break;
}
src_link->last_recv_mtu = inbuf->khp_pmtud_size;
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return;
default:
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int err, savederrno;
int i, msg_recv, transport;
if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
/*
* this is normal if a fd got an event and before we grab the read lock
* and the link is removed by another thread
*/
goto exit_unlock;
}
transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
/*
* reset msg_namelen to buffer size because after recvmmsg
* each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
*/
for (i = 0; i < PCKT_RX_BUFS; i++) {
msg[i].msg_hdr.msg_namelen = knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len;
}
msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;
/*
* WARNING: man page for recvmmsg is wrong. Kernel implementation here:
* recvmmsg can return:
* -1 on error
* 0 if the previous run of recvmmsg recorded an error on the socket
* N number of messages (see exception below).
*
* If there is an error from recvmsg after receiving a frame or more, the recvmmsg
* loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
* it will be visibile in the next run.
*
* Need to be careful how we handle errors at this stage.
*
* error messages need to be handled on a per transport/protocol base
* at this point we have different layers of error handling
* - msg_recv < 0 -> error from this run
* msg_recv = 0 -> error from previous run and error on socket needs to be cleared
* - per-transport message data
* example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
* but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
* - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
* errno set. That means the error api needs to be able to abort the loop below.
*/
if (msg_recv <= 0) {
transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno);
goto exit_unlock;
}
for (i = 0; i < msg_recv; i++) {
err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]);
/*
* TODO: make this section silent once we are confident
* all protocols packet handlers are good
*/
switch(err) {
case KNET_TRANSPORT_RX_ERROR: /* on error */
log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
break;
case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */
log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
goto exit_unlock;
break;
case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */
- /*
- * processing incoming packets vs access lists
- */
- if ((knet_h->use_access_lists) &&
- (transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) {
- if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) {
- char src_ipaddr[KNET_MAX_HOST_LEN];
- char src_port[KNET_MAX_PORT_LEN];
-
- memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
- memset(src_port, 0, KNET_MAX_PORT_LEN);
- if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name),
- src_ipaddr, KNET_MAX_HOST_LEN,
- src_port, KNET_MAX_PORT_LEN) < 0) {
-
- log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port");
- } else {
- log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port);
- }
- /*
- * continue processing the other packets
- */
- continue;
- }
- }
_parse_recv_from_links(knet_h, sockfd, &msg[i]);
break;
case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE:
log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue");
break;
case KNET_TRANSPORT_RX_OOB_DATA_STOP:
log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop");
goto exit_unlock;
break;
}
}
exit_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_RX_BUFS];
struct knet_mmsghdr msg[PCKT_RX_BUFS];
struct iovec iov_in[PCKT_RX_BUFS];
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED);
memset(&msg, 0, sizeof(msg));
memset(&events, 0, sizeof(events));
for (i = 0; i < PCKT_RX_BUFS; i++) {
iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
iov_in[i].iov_len = KNET_DATABUFSIZE;
memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
msg[i].msg_hdr.msg_name = &address[i];
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); /* Real value filled in before actual use */
msg[i].msg_hdr.msg_iov = &iov_in[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
/*
* the RX threads only need to notify that there has been at least
* one successful run after queue flush has been requested.
* See setfwd in handle.c
*/
if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
}
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
}
}
set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED);
return NULL;
}
ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
{
int savederrno = 0;
ssize_t err = 0;
struct iovec iov_in;
if (!_is_valid_handle(knet_h)) {
return -1;
}
if (buff == NULL) {
errno = EINVAL;
return -1;
}
if (buff_len <= 0) {
errno = EINVAL;
return -1;
}
if (buff_len > KNET_MAX_PACKET_SIZE) {
errno = EINVAL;
return -1;
}
if (channel < 0) {
errno = EINVAL;
return -1;
}
if (channel >= KNET_DATAFD_MAX) {
errno = EINVAL;
return -1;
}
savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
goto out_unlock;
}
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)buff;
iov_in.iov_len = buff_len;
err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
savederrno = errno;
out_unlock:
pthread_rwlock_unlock(&knet_h->global_rwlock);
errno = err ? savederrno : 0;
return err;
}
diff --git a/libknet/transport_sctp.c b/libknet/transport_sctp.c
index 94891f93..b505f429 100644
--- a/libknet/transport_sctp.c
+++ b/libknet/transport_sctp.c
@@ -1,1622 +1,1646 @@
/*
* Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved.
*
* Author: Christine Caulfield <ccaulfie@redhat.com>
*
* This software licensed under LGPL-2.0+
*/
#include "config.h"
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <assert.h>
#include "compat.h"
#include "host.h"
#include "links.h"
#include "links_acl.h"
#include "links_acl_ip.h"
#include "logging.h"
#include "netutils.h"
#include "common.h"
#include "transport_common.h"
#include "transports.h"
#include "threads_common.h"
#ifdef HAVE_NETINET_SCTP_H
#include <netinet/sctp.h>
#include "transport_sctp.h"
typedef struct sctp_handle_info {
struct qb_list_head listen_links_list;
struct qb_list_head connect_links_list;
int connect_epollfd;
int connectsockfd[2];
int listen_epollfd;
int listensockfd[2];
pthread_t connect_thread;
pthread_t listen_thread;
socklen_t event_subscribe_kernel_size;
char *event_subscribe_buffer;
} sctp_handle_info_t;
/*
* use by fd_tracker data type
*/
#define SCTP_NO_LINK_INFO 0
#define SCTP_LISTENER_LINK_INFO 1
#define SCTP_ACCEPTED_LINK_INFO 2
#define SCTP_CONNECT_LINK_INFO 3
/*
* this value is per listener
*/
#define MAX_ACCEPTED_SOCKS 256
typedef struct sctp_listen_link_info {
struct qb_list_head list;
int listen_sock;
int accepted_socks[MAX_ACCEPTED_SOCKS];
struct sockaddr_storage src_address;
int on_listener_epoll;
int on_rx_epoll;
int sock_shutdown;
} sctp_listen_link_info_t;
typedef struct sctp_accepted_link_info {
char mread_buf[KNET_DATABUFSIZE];
ssize_t mread_len;
sctp_listen_link_info_t *link_info;
} sctp_accepted_link_info_t ;
typedef struct sctp_connect_link_info {
struct qb_list_head list;
sctp_listen_link_info_t *listener;
struct knet_link *link;
struct sockaddr_storage dst_address;
int connect_sock;
int on_rx_epoll;
int close_sock;
int sock_shutdown;
} sctp_connect_link_info_t;
/*
* socket handling functions
*
* those functions do NOT perform locking. locking
* should be handled in the right context from callers
*/
/*
* sockets are removed from rx_epoll from callers
* see also error handling functions
*/
static int _close_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
struct epoll_event ev;
sctp_connect_link_info_t *info = kn_link->transport_link;
if (info->connect_sock != -1) {
if (info->on_rx_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->connect_sock;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_rx_epoll = 0;
}
if (_set_fd_tracker(knet_h, info->connect_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, 0, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
} else {
close(info->connect_sock);
info->connect_sock = -1;
}
}
exit_error:
errno = savederrno;
return err;
}
static int _enable_sctp_notifications(knet_handle_t knet_h, int sock, const char *type)
{
int err = 0, savederrno = 0;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
if (setsockopt(sock, IPPROTO_SCTP, SCTP_EVENTS,
handle_info->event_subscribe_buffer,
handle_info->event_subscribe_kernel_size) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to enable %s events: %s",
type, strerror(savederrno));
}
errno = savederrno;
return err;
}
static int _configure_sctp_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, uint64_t flags, const char *type)
{
int err = 0, savederrno = 0;
int value;
int level;
#ifdef SOL_SCTP
level = SOL_SCTP;
#else
level = IPPROTO_SCTP;
#endif
if (_configure_transport_socket(knet_h, sock, address, flags, type) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
value = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set reuseaddr on socket %d: %s",
sock, strerror(savederrno));
goto exit_error;
}
value = 1;
if (setsockopt(sock, level, SCTP_NODELAY, &value, sizeof(value)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set sctp nodelay: %s",
strerror(savederrno));
goto exit_error;
}
if (_enable_sctp_notifications(knet_h, sock, type) < 0) {
savederrno = errno;
err = -1;
}
exit_error:
errno = savederrno;
return err;
}
static int _reconnect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info = kn_link->transport_link;
if (connect(info->connect_sock, (struct sockaddr *)&kn_link->dst_addr, sockaddr_len(&kn_link->dst_addr)) < 0) {
savederrno = errno;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP socket %d received error: %s", info->connect_sock, strerror(savederrno));
if ((savederrno != EALREADY) && (savederrno != EINPROGRESS) && (savederrno != EISCONN)) {
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to connect SCTP socket %d: %s",
info->connect_sock, strerror(savederrno));
}
}
errno = savederrno;
return err;
}
static int _create_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
struct epoll_event ev;
sctp_connect_link_info_t *info = kn_link->transport_link;
int connect_sock;
struct sockaddr_storage connect_addr;
connect_sock = socket(kn_link->dst_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP);
if (connect_sock < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create send/recv socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_configure_sctp_socket(knet_h, connect_sock, &kn_link->dst_addr, kn_link->flags, "SCTP connect") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
memset(&connect_addr, 0, sizeof(struct sockaddr_storage));
if (knet_strtoaddr(kn_link->status.src_ipaddr, "0", &connect_addr, sockaddr_len(&connect_addr)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to resolve connecting socket: %s",
strerror(savederrno));
goto exit_error;
}
if (bind(connect_sock, (struct sockaddr *)&connect_addr, sockaddr_len(&connect_addr)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind connecting socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_set_fd_tracker(knet_h, connect_sock, KNET_TRANSPORT_SCTP, SCTP_CONNECT_LINK_INFO, sockaddr_len(&kn_link->src_addr), info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = connect_sock;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, connect_sock, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connected socket to epoll pool: %s",
strerror(errno));
}
info->on_rx_epoll = 1;
info->connect_sock = connect_sock;
info->close_sock = 0;
kn_link->outsock = info->connect_sock;
if (_reconnect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
exit_error:
if (err) {
if (connect_sock >= 0) {
close(connect_sock);
}
}
errno = savederrno;
return err;
}
static void _lock_sleep_relock(knet_handle_t knet_h)
{
int i = 0;
/* Don't hold onto the lock while sleeping */
pthread_rwlock_unlock(&knet_h->global_rwlock);
while (i < 5) {
usleep(KNET_THREADS_TIMERES / 16);
if (!pthread_rwlock_rdlock(&knet_h->global_rwlock)) {
/*
* lock acquired, we can go out
*/
return;
} else {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get read lock!");
i++;
}
}
/*
* time to crash! if we cannot re-acquire the lock
* there is no easy way out of this one
*/
assert(0);
}
int sctp_transport_tx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_listen_link_info_t *listen_info;
if (recv_err < 0) {
switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) {
case SCTP_CONNECT_LINK_INFO:
if (connect_info->link->transport_connected == 0) {
return -1;
}
break;
case SCTP_ACCEPTED_LINK_INFO:
listen_info = accepted_info->link_info;
if (listen_info->listen_sock != sockfd) {
if (listen_info->on_rx_epoll == 0) {
return -1;
}
}
break;
}
if (recv_errno == EAGAIN) {
#ifdef DEBUG
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Sock: %d is overloaded. Slowing TX down", sockfd);
#endif
_lock_sleep_relock(knet_h);
return 1;
}
return -1;
}
return 0;
}
/*
* socket error management functions
*
* both called with global read lock.
*
* NOTE: we need to remove the fd from the epoll as soon as possible
* even before we notify the respective thread to take care of it
* because scheduling can make it so that this thread will overload
* and the threads supposed to take care of the error will never
* be able to take action.
* we CANNOT handle FDs here directly (close/reconnect/etc) due
* to locking context. We need to delegate that to their respective
* management threads within the global write lock.
*
* this function is called from:
* - RX thread with recv_err <= 0 directly on recvmmsg error
* - transport_rx_is_data when msg_len == 0 (recv_err = 1)
* - transport_rx_is_data on notification (recv_err = 2)
*
* basically this small abuse of recv_err is to detect notifications
* generated by sockets created by listen().
*/
int sctp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno)
{
struct epoll_event ev;
sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_listen_link_info_t *listen_info;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) {
case SCTP_CONNECT_LINK_INFO:
/*
* all connect link have notifications enabled
* and we accept only data from notification and
* generic recvmmsg errors.
*
* Errors generated by msg_len 0 can be ignored because
* they follow a notification (double notification)
*/
if (recv_err != 1) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received an error", sockfd);
if (sendto(handle_info->connectsockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno));
}
}
break;
case SCTP_ACCEPTED_LINK_INFO:
listen_info = accepted_info->link_info;
if (listen_info->listen_sock != sockfd) {
if (recv_err != 1) {
if (listen_info->on_rx_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = sockfd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
strerror(errno));
return -1;
}
listen_info->on_rx_epoll = 0;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying listen thread that sockfd %d received an error", sockfd);
if (sendto(handle_info->listensockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify listen thread: %s", strerror(errno));
}
}
} else {
/*
* this means the listen() socket has generated
* a notification. now what? :-)
*/
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen() socket %d", sockfd);
}
break;
default:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received unknown notification? %d", sockfd);
break;
}
/*
* Under RX pressure we need to give time to IPC to pick up the message
*/
_lock_sleep_relock(knet_h);
return 0;
}
/*
* NOTE: sctp_transport_rx_is_data is called with global rdlock
* delegate any FD error management to sctp_transport_rx_sock_error
* and keep this code to parsing incoming data only
*/
int sctp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
size_t i;
struct iovec *iov = msg->msg_hdr.msg_iov;
size_t iovlen = msg->msg_hdr.msg_iovlen;
struct sctp_assoc_change *sac;
union sctp_notification *snp;
sctp_accepted_link_info_t *listen_info = knet_h->knet_transport_fd_tracker[sockfd].data;
sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data;
if (!(msg->msg_hdr.msg_flags & MSG_NOTIFICATION)) {
if (msg->msg_len == 0) {
/*
* NOTE: with event notification enabled, we receive error twice:
* 1) from the event notification
* 2) followed by a 0 byte msg_len
*
* the event handler should take care to avoid #2 by stopping
* the rx thread from processing more packets than necessary.
*/
if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
if (connect_info->sock_shutdown) {
return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE;
}
} else {
if (listen_info->link_info->sock_shutdown) {
return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE;
}
}
/*
* this is pretty much dead code and we should never hit it.
* keep it for safety and avoid the rx thread to process
* bad info / data.
*/
return KNET_TRANSPORT_RX_NOT_DATA_STOP;
}
/*
* missing MSG_EOR has to be treated as a short read
* from the socket and we need to fill in the mread buf
* while we wait for MSG_EOR
*/
if (!(msg->msg_hdr.msg_flags & MSG_EOR)) {
/*
* copy the incoming data into mread_buf + mread_len (incremental)
* and increase mread_len
*/
memmove(listen_info->mread_buf + listen_info->mread_len, iov->iov_base, msg->msg_len);
listen_info->mread_len = listen_info->mread_len + msg->msg_len;
return KNET_TRANSPORT_RX_NOT_DATA_CONTINUE;
}
/*
* got EOR.
* if mread_len is > 0 we are completing a packet from short reads
* complete reassembling the packet in mread_buf, copy it back in the iov
* and set the iov/msg len numbers (size) correctly
*/
if (listen_info->mread_len) {
/*
* add last fragment to mread_buf
*/
memmove(listen_info->mread_buf + listen_info->mread_len, iov->iov_base, msg->msg_len);
listen_info->mread_len = listen_info->mread_len + msg->msg_len;
/*
* move all back into the iovec
*/
memmove(iov->iov_base, listen_info->mread_buf, listen_info->mread_len);
msg->msg_len = listen_info->mread_len;
listen_info->mread_len = 0;
}
return KNET_TRANSPORT_RX_IS_DATA;
}
if (!(msg->msg_hdr.msg_flags & MSG_EOR)) {
return KNET_TRANSPORT_RX_NOT_DATA_STOP;
}
for (i = 0; i < iovlen; i++) {
snp = iov[i].iov_base;
switch (snp->sn_header.sn_type) {
case SCTP_ASSOC_CHANGE:
sac = &snp->sn_assoc_change;
switch (sac->sac_state) {
case SCTP_COMM_LOST:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: comm_lost", sockfd);
if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
connect_info->close_sock = 1;
connect_info->link->transport_connected = 0;
}
sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
return KNET_TRANSPORT_RX_OOB_DATA_STOP;
break;
case SCTP_COMM_UP:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: comm_up", sockfd);
if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
connect_info->link->transport_connected = 1;
}
break;
case SCTP_RESTART:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: restart", sockfd);
break;
case SCTP_SHUTDOWN_COMP:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: shutdown comp", sockfd);
if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
connect_info->close_sock = 1;
}
sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
return KNET_TRANSPORT_RX_OOB_DATA_STOP;
break;
case SCTP_CANT_STR_ASSOC:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: cant str assoc", sockfd);
sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0);
break;
default:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: unknown %d", sockfd, sac->sac_state);
break;
}
break;
case SCTP_SHUTDOWN_EVENT:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp shutdown event socket %d", sockfd);
if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) {
connect_info->link->transport_connected = 0;
connect_info->sock_shutdown = 1;
} else {
listen_info->link_info->sock_shutdown = 1;
}
break;
case SCTP_SEND_FAILED:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp send failed socket: %d", sockfd);
break;
case SCTP_PEER_ADDR_CHANGE:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp peer addr change socket %d", sockfd);
break;
case SCTP_REMOTE_ERROR:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp remote error socket %d", sockfd);
break;
default:
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] unknown sctp event socket: %d type: %hu", sockfd, snp->sn_header.sn_type);
break;
}
}
return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE;
}
int sctp_transport_link_is_down(knet_handle_t knet_h, struct knet_link *kn_link)
{
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_connect_link_info_t *info = kn_link->transport_link;
kn_link->transport_connected = 0;
info->close_sock = 1;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received a link down event", info->connect_sock);
if (sendto(handle_info->connectsockfd[1], &info->connect_sock, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno));
}
return 0;
}
/*
* connect / outgoing socket management thread
*/
/*
* _handle_connected_sctp* are called with a global write lock
* from the connect_thread
*/
static void _handle_connected_sctp_socket(knet_handle_t knet_h, int connect_sock)
{
int err;
unsigned int status, len = sizeof(status);
sctp_connect_link_info_t *info = knet_h->knet_transport_fd_tracker[connect_sock].data;
struct knet_link *kn_link = info->link;
if (info->close_sock) {
if (_close_connect_socket(knet_h, kn_link) < 0) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close sock %d from _handle_connected_sctp_socket: %s", connect_sock, strerror(errno));
return;
}
info->close_sock = 0;
if (_create_connect_socket(knet_h, kn_link) < 0) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to recreate connecting sock! %s", strerror(errno));
return;
}
}
_reconnect_socket(knet_h, info->link);
err = getsockopt(connect_sock, SOL_SOCKET, SO_ERROR, &status, &len);
if (err) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP getsockopt() on connecting socket %d failed: %s",
connect_sock, strerror(errno));
return;
}
if (status) {
log_info(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect on %d to %s port %s failed: %s",
connect_sock, kn_link->status.dst_ipaddr, kn_link->status.dst_port,
strerror(status));
/*
* No need to create a new socket if connect failed,
* just retry connect
*/
return;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP handler fd %d now connected to %s port %s",
connect_sock,
kn_link->status.dst_ipaddr, kn_link->status.dst_port);
}
static void _handle_connected_sctp_notifications(knet_handle_t knet_h)
{
int sockfd = -1;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
if (recv(handle_info->connectsockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on connectsockfd");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for connected socket fd error");
return;
}
/*
* revalidate sockfd
*/
if ((sockfd < 0) || (sockfd >= KNET_MAX_FDS)) {
return;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing connected error on socket: %d", sockfd);
_handle_connected_sctp_socket(knet_h, sockfd);
}
static void *_sctp_connect_thread(void *data)
{
int savederrno;
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_STARTED);
memset(&events, 0, sizeof(events));
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(handle_info->connect_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
if (nev < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect handler EPOLL ERROR: %s",
strerror(errno));
continue;
}
/*
* Sort out which FD has a connection
*/
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s",
strerror(savederrno));
continue;
}
/*
* minor optimization: deduplicate events
*
* in some cases we can receive multiple notifcations
* of the same FD having issues or need handling.
* It's enough to process it once even tho it's safe
* to handle them multiple times.
*/
for (i = 0; i < nev; i++) {
if (events[i].data.fd == handle_info->connectsockfd[0]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for connected socket");
_handle_connected_sctp_notifications(knet_h);
} else {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification on connected sockfd %d\n", events[i].data.fd);
}
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
/*
* this thread can generate events for itself.
* we need to sleep in between loops to allow other threads
* to be scheduled
*/
usleep(knet_h->reconnect_int * 1000);
}
set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_STOPPED);
return NULL;
}
/*
* listen/incoming connections management thread
*/
/*
* Listener received a new connection
* called with a write lock from main thread
*/
static void _handle_incoming_sctp(knet_handle_t knet_h, int listen_sock)
{
int err = 0, savederrno = 0;
int new_fd;
int i = -1;
sctp_listen_link_info_t *info = knet_h->knet_transport_fd_tracker[listen_sock].data;
struct epoll_event ev;
struct sockaddr_storage ss;
socklen_t sock_len = sizeof(ss);
char addr_str[KNET_MAX_HOST_LEN];
char port_str[KNET_MAX_PORT_LEN];
sctp_accepted_link_info_t *accept_info = NULL;
+ struct knet_host *host;
+ struct knet_link *kn_link;
+ int link_idx;
+ sctp_connect_link_info_t *this_link_connect_info;
+ sctp_listen_link_info_t *this_link_listen_info;
+ int pass_acl = 0;
memset(&ss, 0, sizeof(ss));
new_fd = accept(listen_sock, (struct sockaddr *)&ss, &sock_len);
if (new_fd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accept error: %s", strerror(errno));
goto exit_error;
}
if (knet_addrtostr(&ss, sizeof(ss),
addr_str, KNET_MAX_HOST_LEN,
port_str, KNET_MAX_PORT_LEN) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to gather socket info");
goto exit_error;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: received connection from: %s port: %s",
addr_str, port_str);
+
if (knet_h->use_access_lists) {
- if (!check_validate(knet_h, listen_sock, KNET_TRANSPORT_SCTP, &ss)) {
+ for (host = knet_h->host_head; host != NULL; host = host->next) {
+ for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
+ kn_link = &host->link[link_idx];
+
+ if ((kn_link->configured) && (kn_link->transport == KNET_TRANSPORT_SCTP)) {
+ this_link_connect_info = kn_link->transport_link;
+ this_link_listen_info = this_link_connect_info->listener;
+ if ((this_link_listen_info->listen_sock == listen_sock) &&
+ (check_validate(knet_h, kn_link, &ss))) {
+ pass_acl = 1;
+ break;
+ }
+ }
+ }
+ if (pass_acl) {
+ break;
+ }
+ }
+ if (!pass_acl) {
savederrno = EINVAL;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Connection rejected from %s/%s", addr_str, port_str);
close(new_fd);
errno = savederrno;
return;
}
}
/*
* Keep a track of all accepted FDs
*/
for (i=0; i<MAX_ACCEPTED_SOCKS; i++) {
if (info->accepted_socks[i] == -1) {
info->accepted_socks[i] = new_fd;
break;
}
}
if (i == MAX_ACCEPTED_SOCKS) {
errno = EBUSY;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: too many connections!");
goto exit_error;
}
if (_configure_common_socket(knet_h, new_fd, 0, "SCTP incoming") < 0) { /* Inherit flags from listener? */
savederrno = errno;
err = -1;
goto exit_error;
}
if (_enable_sctp_notifications(knet_h, new_fd, "Incoming connection") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
accept_info = malloc(sizeof(sctp_accepted_link_info_t));
if (!accept_info) {
savederrno = errno;
err = -1;
goto exit_error;
}
memset(accept_info, 0, sizeof(sctp_accepted_link_info_t));
accept_info->link_info = info;
if (_set_fd_tracker(knet_h, new_fd, KNET_TRANSPORT_SCTP, SCTP_ACCEPTED_LINK_INFO,
knet_h->knet_transport_fd_tracker[listen_sock].sockaddr_len,
accept_info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(errno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = new_fd;
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, new_fd, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to add accepted socket %d to epoll pool: %s",
new_fd, strerror(errno));
goto exit_error;
}
info->on_rx_epoll = 1;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accepted new fd %d for %s/%s (listen fd: %d). index: %d",
new_fd, addr_str, port_str, info->listen_sock, i);
exit_error:
if (err) {
if ((i >= 0) && (i < MAX_ACCEPTED_SOCKS)) {
info->accepted_socks[i] = -1;
}
/*
* check the error to make coverity scan happy.
* _set_fd_tracker cannot fail at this stage
*/
if (_set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, 0, NULL) < 0){
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to update fdtracker for socket %d", new_fd);
}
free(accept_info);
if (new_fd >= 0) {
close(new_fd);
}
}
errno = savederrno;
return;
}
/*
* Listen thread received a notification of a bad socket that needs closing
* called with a write lock from main thread
*/
static void _handle_listen_sctp_errors(knet_handle_t knet_h)
{
int sockfd = -1;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_accepted_link_info_t *accept_info;
sctp_listen_link_info_t *info;
struct knet_host *host;
int link_idx;
int i;
if (recv(handle_info->listensockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on listensockfd");
return;
}
if (_is_valid_fd(knet_h, sockfd) < 1) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen socket fd error");
return;
}
/*
* revalidate sockfd
*/
if ((sockfd < 0) || (sockfd >= KNET_MAX_FDS)) {
return;
}
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing listen error on socket: %d", sockfd);
accept_info = knet_h->knet_transport_fd_tracker[sockfd].data;
info = accept_info->link_info;
/*
* clear all links using this accepted socket as
* outbound dynamically connected socket
*/
for (host = knet_h->host_head; host != NULL; host = host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if ((host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
(host->link[link_idx].outsock == sockfd)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Found dynamic connection on host %d link %d (%d)",
host->host_id, link_idx, sockfd);
host->link[link_idx].status.dynconnected = 0;
host->link[link_idx].transport_connected = 0;
host->link[link_idx].outsock = 0;
memset(&host->link[link_idx].dst_addr, 0, sizeof(struct sockaddr_storage));
}
}
}
for (i=0; i<MAX_ACCEPTED_SOCKS; i++) {
if (sockfd == info->accepted_socks[i]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Closing accepted socket %d", sockfd);
/*
* check the error to make coverity scan happy.
* _set_fd_tracker cannot fail at this stage
*/
if (_set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, 0, NULL) < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to update fdtracker for socket %d", sockfd);
}
info->accepted_socks[i] = -1;
free(accept_info);
close(sockfd);
break; /* Keeps covscan happy */
}
}
}
static void *_sctp_listen_thread(void *data)
{
int savederrno;
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_STARTED);
memset(&events, 0, sizeof(events));
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(handle_info->listen_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
/*
* we use timeout to detect if thread is shutting down
*/
if (nev == 0) {
continue;
}
if (nev < 0) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listen handler EPOLL ERROR: %s",
strerror(errno));
continue;
}
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s",
strerror(savederrno));
continue;
}
/*
* Sort out which FD has an incoming connection
*/
for (i = 0; i < nev; i++) {
if (events[i].data.fd == handle_info->listensockfd[0]) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for listener/accepted socket");
_handle_listen_sctp_errors(knet_h);
} else {
if (_is_valid_fd(knet_h, events[i].data.fd) == 1) {
_handle_incoming_sctp(knet_h, events[i].data.fd);
} else {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received listen notification from invalid socket");
}
}
}
pthread_rwlock_unlock(&knet_h->global_rwlock);
}
set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_STOPPED);
return NULL;
}
/*
* sctp_link_listener_start/stop are called in global write lock
* context from set_config and clear_config.
*/
static sctp_listen_link_info_t *sctp_link_listener_start(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int listen_sock = -1;
struct epoll_event ev;
sctp_listen_link_info_t *info = NULL;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
/*
* Only allocate a new listener if src address is different
*/
qb_list_for_each_entry(info, &handle_info->listen_links_list, list) {
if (memcmp(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)) == 0) {
- if ((check_add(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP, -1,
+ if ((check_add(knet_h, kn_link, -1,
&kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) {
return NULL;
}
return info;
}
}
info = malloc(sizeof(sctp_listen_link_info_t));
if (!info) {
err = -1;
goto exit_error;
}
memset(info, 0, sizeof(sctp_listen_link_info_t));
memset(info->accepted_socks, -1, sizeof(info->accepted_socks));
memmove(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage));
listen_sock = socket(kn_link->src_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP);
if (listen_sock < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_configure_sctp_socket(knet_h, listen_sock, &kn_link->src_addr, kn_link->flags, "SCTP listener") < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
if (bind(listen_sock, (struct sockaddr *)&kn_link->src_addr, sockaddr_len(&kn_link->src_addr)) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (listen(listen_sock, 5) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to listen on listener socket: %s",
strerror(savederrno));
goto exit_error;
}
if (_set_fd_tracker(knet_h, listen_sock, KNET_TRANSPORT_SCTP, SCTP_LISTENER_LINK_INFO, sockaddr_len(&kn_link->src_addr), info) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
- if ((check_add(knet_h, listen_sock, KNET_TRANSPORT_SCTP, -1,
+ if ((check_add(knet_h, kn_link, -1,
&kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to configure default access lists: %s",
strerror(savederrno));
goto exit_error;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, listen_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listener to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_listener_epoll = 1;
info->listen_sock = listen_sock;
qb_list_add(&info->list, &handle_info->listen_links_list);
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Listening on fd %d for %s:%s", listen_sock, kn_link->status.src_ipaddr, kn_link->status.src_port);
exit_error:
if (err) {
if ((info) && (info->on_listener_epoll)) {
epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, listen_sock, &ev);
}
if (listen_sock >= 0) {
- check_rmall(knet_h, listen_sock, KNET_TRANSPORT_SCTP);
+ check_rmall(knet_h, kn_link);
close(listen_sock);
}
if (info) {
free(info);
info = NULL;
}
}
errno = savederrno;
return info;
}
static int sctp_link_listener_stop(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
int found = 0, i;
struct knet_host *host;
int link_idx;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
sctp_connect_link_info_t *this_link_info = kn_link->transport_link;
sctp_listen_link_info_t *info = this_link_info->listener;
sctp_connect_link_info_t *link_info;
struct epoll_event ev;
for (host = knet_h->host_head; host != NULL; host = host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if (&host->link[link_idx] == kn_link)
continue;
link_info = host->link[link_idx].transport_link;
if ((link_info) &&
(link_info->listener == info)) {
found = 1;
break;
}
}
}
- if ((check_rm(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP,
+ if ((check_rm(knet_h, kn_link,
&kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != ENOENT)) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove default access lists for %d", info->listen_sock);
}
if (found) {
this_link_info->listener = NULL;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listener socket %d still in use", info->listen_sock);
savederrno = EBUSY;
err = -1;
goto exit_error;
}
if (info->on_listener_epoll) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->listen_sock;
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, info->listen_sock, &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener to epoll pool: %s",
strerror(savederrno));
goto exit_error;
}
info->on_listener_epoll = 0;
}
if (_set_fd_tracker(knet_h, info->listen_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, 0, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
- check_rmall(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP);
-
+ check_rmall(knet_h, kn_link);
close(info->listen_sock);
for (i=0; i< MAX_ACCEPTED_SOCKS; i++) {
if (info->accepted_socks[i] > -1) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = info->accepted_socks[i];
if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->accepted_socks[i], &ev)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s",
strerror(errno));
}
info->on_rx_epoll = 0;
free(knet_h->knet_transport_fd_tracker[info->accepted_socks[i]].data);
close(info->accepted_socks[i]);
if (_set_fd_tracker(knet_h, info->accepted_socks[i], KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, 0, NULL) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s",
strerror(savederrno));
goto exit_error;
}
info->accepted_socks[i] = -1;
}
}
qb_list_del(&info->list);
free(info);
this_link_info->listener = NULL;
exit_error:
errno = savederrno;
return err;
}
/*
* Links config/clear. Both called with global wrlock from link_set_config/clear_config
*/
int sctp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int savederrno = 0, err = 0;
sctp_connect_link_info_t *info;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
info = malloc(sizeof(sctp_connect_link_info_t));
if (!info) {
goto exit_error;
}
memset(info, 0, sizeof(sctp_connect_link_info_t));
kn_link->transport_link = info;
info->link = kn_link;
memmove(&info->dst_address, &kn_link->dst_addr, sizeof(struct sockaddr_storage));
info->connect_sock = -1;
info->listener = sctp_link_listener_start(knet_h, kn_link);
if (!info->listener) {
savederrno = errno;
err = -1;
goto exit_error;
}
if (kn_link->dynamic == KNET_LINK_STATIC) {
if (_create_connect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
goto exit_error;
}
kn_link->outsock = info->connect_sock;
}
qb_list_add(&info->list, &handle_info->connect_links_list);
exit_error:
if (err) {
if (info) {
if (info->connect_sock >= 0) {
close(info->connect_sock);
}
if (info->listener) {
sctp_link_listener_stop(knet_h, kn_link);
}
kn_link->transport_link = NULL;
free(info);
}
}
errno = savederrno;
return err;
}
/*
* called with global wrlock
*/
int sctp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link)
{
int err = 0, savederrno = 0;
sctp_connect_link_info_t *info;
if (!kn_link) {
errno = EINVAL;
return -1;
}
info = kn_link->transport_link;
if (!info) {
errno = EINVAL;
return -1;
}
if ((sctp_link_listener_stop(knet_h, kn_link) <0) && (errno != EBUSY)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener transport: %s",
strerror(savederrno));
goto exit_error;
}
if (_close_connect_socket(knet_h, kn_link) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close connected socket: %s",
strerror(savederrno));
goto exit_error;
}
qb_list_del(&info->list);
free(info);
kn_link->transport_link = NULL;
exit_error:
errno = savederrno;
return err;
}
/*
* transport_free and transport_init are
* called only from knet_handle_new and knet_handle_free.
* all resources (hosts/links) should have been already freed at this point
* and they are called in a write locked context, hence they
* don't need their own locking.
*/
int sctp_transport_free(knet_handle_t knet_h)
{
sctp_handle_info_t *handle_info;
void *thread_status;
struct epoll_event ev;
if (!knet_h->transports[KNET_TRANSPORT_SCTP]) {
errno = EINVAL;
return -1;
}
handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
/*
* keep it here while we debug list usage and such
*/
if (!qb_list_empty(&handle_info->listen_links_list)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. listen links list is not empty");
}
if (!qb_list_empty(&handle_info->connect_links_list)) {
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. connect links list is not empty");
}
if (handle_info->listen_thread) {
pthread_cancel(handle_info->listen_thread);
pthread_join(handle_info->listen_thread, &thread_status);
}
if (handle_info->connect_thread) {
pthread_cancel(handle_info->connect_thread);
pthread_join(handle_info->connect_thread, &thread_status);
}
if (handle_info->listensockfd[0] >= 0) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->listensockfd[0];
epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, handle_info->listensockfd[0], &ev);
}
if (handle_info->connectsockfd[0] >= 0) {
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->connectsockfd[0];
epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, handle_info->connectsockfd[0], &ev);
}
_close_socketpair(knet_h, handle_info->connectsockfd);
_close_socketpair(knet_h, handle_info->listensockfd);
if (handle_info->listen_epollfd >= 0) {
close(handle_info->listen_epollfd);
}
if (handle_info->connect_epollfd >= 0) {
close(handle_info->connect_epollfd);
}
free(handle_info->event_subscribe_buffer);
free(handle_info);
knet_h->transports[KNET_TRANSPORT_SCTP] = NULL;
return 0;
}
static int _sctp_subscribe_init(knet_handle_t knet_h)
{
int test_socket, savederrno;
sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP];
char dummy_events[100];
struct sctp_event_subscribe *events;
/* Below we set the first 6 fields of this expanding struct.
* SCTP_EVENTS is deprecated, but SCTP_EVENT is not available
* on Linux; on the other hand, FreeBSD and old Linux does not
* accept small transfers, so we can't simply use this minimum
* everywhere. Thus we query and store the native size. */
const unsigned int subscribe_min = 6;
test_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_SCTP);
if (test_socket < 0) {
if (errno == EPROTONOSUPPORT) {
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP not supported, skipping initialization");
return 0;
}
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create test socket: %s",
strerror(savederrno));
return savederrno;
}
handle_info->event_subscribe_kernel_size = sizeof dummy_events;
if (getsockopt(test_socket, IPPROTO_SCTP, SCTP_EVENTS, &dummy_events,
&handle_info->event_subscribe_kernel_size)) {
close(test_socket);
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to query kernel size of struct sctp_event_subscribe: %s",
strerror(savederrno));
return savederrno;
}
close(test_socket);
if (handle_info->event_subscribe_kernel_size < subscribe_min) {
savederrno = ERANGE;
log_err(knet_h, KNET_SUB_TRANSP_SCTP,
"No kernel support for the necessary notifications: struct sctp_event_subscribe is %u bytes, %u needed",
handle_info->event_subscribe_kernel_size, subscribe_min);
return savederrno;
}
events = malloc(handle_info->event_subscribe_kernel_size);
if (!events) {
savederrno = errno;
log_err(knet_h, KNET_SUB_TRANSP_SCTP,
"Failed to allocate event subscribe buffer: %s", strerror(savederrno));
return savederrno;
}
memset(events, 0, handle_info->event_subscribe_kernel_size);
events->sctp_data_io_event = 1;
events->sctp_association_event = 1;
events->sctp_address_event = 1;
events->sctp_send_failure_event = 1;
events->sctp_peer_error_event = 1;
events->sctp_shutdown_event = 1;
handle_info->event_subscribe_buffer = (char *)events;
log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Size of struct sctp_event_subscribe is %u in kernel, %zu in user space",
handle_info->event_subscribe_kernel_size, sizeof(struct sctp_event_subscribe));
return 0;
}
int sctp_transport_init(knet_handle_t knet_h)
{
int err = 0, savederrno = 0;
sctp_handle_info_t *handle_info;
struct epoll_event ev;
if (knet_h->transports[KNET_TRANSPORT_SCTP]) {
errno = EEXIST;
return -1;
}
handle_info = malloc(sizeof(sctp_handle_info_t));
if (!handle_info) {
return -1;
}
memset(handle_info, 0,sizeof(sctp_handle_info_t));
knet_h->transports[KNET_TRANSPORT_SCTP] = handle_info;
savederrno = _sctp_subscribe_init(knet_h);
if (savederrno) {
err = -1;
goto exit_fail;
}
qb_list_init(&handle_info->listen_links_list);
qb_list_init(&handle_info->connect_links_list);
handle_info->listen_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (handle_info->listen_epollfd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll listen fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(handle_info->listen_epollfd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on listen_epollfd: %s",
strerror(savederrno));
goto exit_fail;
}
handle_info->connect_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
if (handle_info->connect_epollfd < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll connect fd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_fdset_cloexec(handle_info->connect_epollfd)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on connect_epollfd: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, handle_info->connectsockfd) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init connect socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->connectsockfd[0];
if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, handle_info->connectsockfd[0], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connectsockfd[0] to connect epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
if (_init_socketpair(knet_h, handle_info->listensockfd) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init listen socketpair: %s",
strerror(savederrno));
goto exit_fail;
}
memset(&ev, 0, sizeof(struct epoll_event));
ev.events = EPOLLIN;
ev.data.fd = handle_info->listensockfd[0];
if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, handle_info->listensockfd[0], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listensockfd[0] to listen epoll pool: %s",
strerror(savederrno));
goto exit_fail;
}
/*
* Start connect & listener threads
*/
set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&handle_info->listen_thread, 0, _sctp_listen_thread, (void *) knet_h);
if (savederrno) {
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp listen thread: %s",
strerror(savederrno));
goto exit_fail;
}
set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_REGISTERED);
savederrno = pthread_create(&handle_info->connect_thread, 0, _sctp_connect_thread, (void *) knet_h);
if (savederrno) {
err = -1;
log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp connect thread: %s",
strerror(savederrno));
goto exit_fail;
}
exit_fail:
if (err < 0) {
sctp_transport_free(knet_h);
}
errno = savederrno;
return err;
}
int sctp_transport_link_dyn_connect(knet_handle_t knet_h, int sockfd, struct knet_link *kn_link)
{
kn_link->outsock = sockfd;
kn_link->status.dynconnected = 1;
kn_link->transport_connected = 1;
return 0;
}
int sctp_transport_link_get_acl_fd(knet_handle_t knet_h, struct knet_link *kn_link)
{
sctp_connect_link_info_t *this_link_info = kn_link->transport_link;
sctp_listen_link_info_t *info = this_link_info->listener;
return info->listen_sock;
}
#endif

File Metadata

Mime Type
text/x-diff
Expires
Tue, Feb 25, 12:36 AM (22 h, 17 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1464559
Default Alt Text
(237 KB)

Event Timeline