diff --git a/libknet/tests/knet_bench.c b/libknet/tests/knet_bench.c index 4166e897..ad9008d2 100644 --- a/libknet/tests/knet_bench.c +++ b/libknet/tests/knet_bench.c @@ -1,663 +1,663 @@ /* * Copyright (C) 2016 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include "libknet.h" #include "internals.h" #include "netutils.h" #include "test-common.h" #define MAX_NODES 128 static int senderid = -1; static knet_handle_t knet_h; static int datafd = 0; static int8_t channel = 0; static int globallistener = 0; static int continous = 0; static struct sockaddr_storage allv4; static struct sockaddr_storage allv6; static int broadcast_test = 1; -static pthread_t rx_thread = NULL; +static pthread_t rx_thread = (pthread_t)NULL; static char *rx_buf[PCKT_FRAG_MAX]; static int shutdown_in_progress = 0; static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER; #define TEST_PING 0 #define TEST_PING_AND_DATA 1 #define TEST_PERF 2 static int test_type = TEST_PING; struct node { int nodeid; int links; struct sockaddr_storage address[KNET_MAX_LINK]; }; static void print_help(void) { printf("knet_bench usage:\n"); printf(" -h print this help (no really)\n"); printf(" -d enable debug logs (default INFO)\n"); printf(" -c [implementation]:[crypto]:[hashing] crypto configuration. (default disabled)\n"); printf(" Example: -c nss:aes128:sha1\n"); printf(" -p [active|passive|rr] (default: passive)\n"); printf(" -P [udp|sctp] (default: udp) protocol (transport) to use\n"); printf(" -t [nodeid] This nodeid (required)\n"); printf(" -n [nodeid],[link1_ip_addr],[link2_..] Other nodes information (at least one required)\n"); printf(" Example: -t 1,192.168.8.1,3ffe::8:1,..\n"); printf(" can be repeated up to %d and should contain also the localnode info\n", MAX_NODES); printf(" -b [port] baseport (default: 50000)\n"); printf(" -l enable global listener on 0.0.0.0/:: (default: off, incompatible with -o)\n"); printf(" -o enable baseport offset per nodeid\n"); printf(" -w dont wait for all nodes to be up before starting the test (default: wait)\n"); printf(" -T [ping|ping_data|perf] test type (default: ping)\n"); printf(" ping: will wait for all hosts to join the knet network, sleep 5 seconds and quit\n"); printf(" ping_data: will wait for all hosts to join the knet network, sends some data to all nodes and quit\n"); printf(" perf: will wait for all hosts to join the knet network, perform a series of benchmarks and quit\n"); printf(" -s nodeid that will generate traffic for benchmarks\n"); printf(" -C repeat the test continously (default: off)\n"); } static void parse_nodes(char *nodesinfo[MAX_NODES], int onidx, int port, struct node nodes[MAX_NODES], int thisnodeid, int *thisidx) { int i; char *temp = NULL; char port_str[10]; memset(port_str, 0, sizeof(port_str)); sprintf(port_str, "%d", port); for (i = 0; i < onidx; i++) { nodes[i].nodeid = atoi(strtok(nodesinfo[i], ",")); if ((nodes[i].nodeid < 0) || (nodes[i].nodeid > KNET_MAX_HOST)) { printf("Invalid nodeid: %d (0 - %d)\n", nodes[i].nodeid, KNET_MAX_HOST); exit(FAIL); } if (thisnodeid == nodes[i].nodeid) { *thisidx = i; } while((temp = strtok(NULL, ","))) { if (nodes[i].links == KNET_MAX_LINK) { printf("Too many links configured. Max %d\n", KNET_MAX_LINK); exit(FAIL); } if (knet_strtoaddr(temp, port_str, &nodes[i].address[nodes[i].links], sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert %s to sockaddress\n", temp); exit(FAIL); } nodes[i].links++; } } if (knet_strtoaddr("0.0.0.0", port_str, &allv4, sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert 0.0.0.0 to sockaddress\n"); exit(FAIL); } if (knet_strtoaddr("::", port_str, &allv6, sizeof(struct sockaddr_storage)) < 0) { printf("Unable to convert :: to sockaddress\n"); exit(FAIL); } for (i = 1; i < onidx; i++) { if (nodes[0].links != nodes[i].links) { printf("knet_bench does not support unbalanced link configuration\n"); exit(FAIL); } } return; } static int private_data; static void sock_notify(void *pvt_data, int local_datafd, int8_t local_channel, uint8_t tx_rx, int error, int errorno) { printf("Error (%d - %d - %s) from socket: %d\n", error, errorno, strerror(errno), local_datafd); return; } static void setup_knet(int argc, char *argv[]) { int logfd; int rv; char *cryptocfg = NULL, *policystr = NULL, *protostr = NULL; char *othernodeinfo[MAX_NODES]; struct node nodes[MAX_NODES]; int thisnodeid = -1; int thisidx = -1; int onidx = 0; int debug = KNET_LOG_INFO; int port = 50000, portoffset = 0; int thisport = 0, otherport = 0; int thisnewport = 0, othernewport = 0; struct sockaddr_in *so_in; struct sockaddr_in6 *so_in6; struct sockaddr_storage *src; int i, link_idx, allnodesup = 0; int policy = KNET_LINK_POLICY_PASSIVE, policyfound = 0; int protocol = KNET_TRANSPORT_UDP, protofound = 0; int wait = 1; struct knet_handle_crypto_cfg knet_handle_crypto_cfg; char *cryptomodel = NULL, *cryptotype = NULL, *cryptohash = NULL; memset(nodes, 0, sizeof(nodes)); while ((rv = getopt(argc, argv, "CT:s:ldowb:t:n:c:p:P:h")) != EOF) { switch(rv) { case 'h': print_help(); exit(PASS); break; case 'd': debug = KNET_LOG_DEBUG; break; case 'c': if (cryptocfg) { printf("Error: -c can only be specified once\n"); exit(FAIL); } cryptocfg = optarg; break; case 'p': if (policystr) { printf("Error: -p can only be specified once\n"); exit(FAIL); } policystr = optarg; if (!strcmp(policystr, "active")) { policy = KNET_LINK_POLICY_ACTIVE; policyfound = 1; } if (!strcmp(policystr, "rr")) { policy = KNET_LINK_POLICY_RR; policyfound = 1; } if (!strcmp(policystr, "passive")) { policy = KNET_LINK_POLICY_PASSIVE; policyfound = 1; } if (!policyfound) { printf("Error: invalid policy %s specified. -p accepts active|passive|rr\n", policystr); exit(FAIL); } break; case 'P': if (protostr) { printf("Error: -P can only be specified once\n"); exit(FAIL); } protostr = optarg; if (!strcmp(protostr, "udp")) { protocol = KNET_TRANSPORT_UDP; protofound = 1; } if (!strcmp(protostr, "sctp")) { protocol = KNET_TRANSPORT_SCTP; protofound = 1; } if (!protofound) { printf("Error: invalid protocol %s specified. -P accepts udp|sctp\n", policystr); exit(FAIL); } break; case 't': if (thisnodeid >= 0) { printf("Error: -t can only be specified once\n"); exit(FAIL); } thisnodeid = atoi(optarg); if ((thisnodeid < 0) || (thisnodeid > 65536)) { printf("Error: -t nodeid out of range %d (1 - 65536)\n", thisnodeid); exit(FAIL); } break; case 'n': if (onidx == MAX_NODES) { printf("Error: too many other nodes. Max %d\n", MAX_NODES); exit(FAIL); } othernodeinfo[onidx] = optarg; onidx++; break; case 'b': port = atoi(optarg); if ((port < 1) || (port > 65536)) { printf("Error: port %d out of range (1 - 65536)\n", port); exit(FAIL); } case 'o': if (globallistener) { printf("Error: -l cannot be used with -o\n"); exit(FAIL); } portoffset = 1; break; case 'l': if (portoffset) { printf("Error: -o cannot be used with -l\n"); exit(FAIL); } globallistener = 1; break; case 'w': wait = 0; break; case 's': if (senderid >= 0) { printf("Error: -s can only be specified once\n"); exit(FAIL); } senderid = atoi(optarg); if ((senderid < 0) || (senderid > 65536)) { printf("Error: -s nodeid out of range %d (1 - 65536)\n", senderid); exit(FAIL); } break; case 'T': if (!strcmp("ping", optarg)) { test_type = TEST_PING; } if (!strcmp("ping_data", optarg)) { test_type = TEST_PING_AND_DATA; } if (!strcmp("perf", optarg)) { test_type = TEST_PERF; } break; case 'C': continous = 1; break; default: break; } } if (thisnodeid < 0) { printf("Who am I?!? missing -t from command line?\n"); exit(FAIL); } if (onidx < 1) { printf("no other nodes configured?!? missing -n from command line\n"); exit(FAIL); } parse_nodes(othernodeinfo, onidx, port, nodes, thisnodeid, &thisidx); if (thisidx < 0) { printf("no config for this node found\n"); exit(FAIL); } if (senderid >= 0) { for (i=0; i < onidx; i++) { if (senderid == nodes[i].nodeid) { break; } } if (i == onidx) { printf("Unable to find senderid in nodelist\n"); exit(FAIL); } } if ((test_type == TEST_PERF) && (senderid < 0)) { printf("Error: performance test requires -s to be set (for now)\n"); exit(FAIL); } logfd = start_logging(stdout); knet_h = knet_handle_new(thisnodeid, logfd, debug); if (!knet_h) { printf("Unable to knet_handle_new: %s\n", strerror(errno)); exit(FAIL); } if (cryptocfg) { memset(&knet_handle_crypto_cfg, 0, sizeof(knet_handle_crypto_cfg)); cryptomodel = strtok(cryptocfg, ":"); cryptotype = strtok(NULL, ":"); cryptohash = strtok(NULL, ":"); if (cryptomodel) { strncpy(knet_handle_crypto_cfg.crypto_model, cryptomodel, sizeof(knet_handle_crypto_cfg.crypto_model) - 1); } if (cryptotype) { strncpy(knet_handle_crypto_cfg.crypto_cipher_type, cryptotype, sizeof(knet_handle_crypto_cfg.crypto_cipher_type) - 1); } if (cryptohash) { strncpy(knet_handle_crypto_cfg.crypto_hash_type, cryptohash, sizeof(knet_handle_crypto_cfg.crypto_hash_type) - 1); } knet_handle_crypto_cfg.private_key_len = KNET_MAX_KEY_LEN; if (knet_handle_crypto(knet_h, &knet_handle_crypto_cfg)) { printf("Unable to init crypto\n"); exit(FAIL); } } if (knet_handle_enable_sock_notify(knet_h, &private_data, sock_notify) < 0) { printf("knet_handle_enable_sock_notify failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } datafd = 0; channel = -1; if (knet_handle_add_datafd(knet_h, &datafd, &channel) < 0) { printf("knet_handle_add_datafd failed: %s\n", strerror(errno)); knet_handle_free(knet_h); exit(FAIL); } for (i=0; i < onidx; i++) { if (i == thisidx) { continue; } if (knet_host_add(knet_h, nodes[i].nodeid) < 0) { printf("knet_host_add failed: %s\n", strerror(errno)); exit(FAIL); } if (knet_host_set_policy(knet_h, nodes[i].nodeid, policy) < 0) { printf("knet_host_set_policy failed: %s\n", strerror(errno)); exit(FAIL); } for (link_idx = 0; link_idx < nodes[i].links; link_idx++) { if (portoffset) { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx]; thisport = ntohs(so_in->sin_port); thisnewport = thisport + nodes[i].nodeid; so_in->sin_port = (htons(thisnewport)); so_in = (struct sockaddr_in *)&nodes[i].address[link_idx]; otherport = ntohs(so_in->sin_port); othernewport = otherport + nodes[thisidx].nodeid; so_in->sin_port = (htons(othernewport)); } else { so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx]; thisport = ntohs(so_in6->sin6_port); thisnewport = thisport + nodes[i].nodeid; so_in6->sin6_port = (htons(thisnewport)); so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx]; otherport = ntohs(so_in6->sin6_port); othernewport = otherport + nodes[thisidx].nodeid; so_in6->sin6_port = (htons(othernewport)); } } if (!globallistener) { src = &nodes[thisidx].address[link_idx]; } else { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { src = &allv4; } else { src = &allv6; } } if (knet_link_set_config(knet_h, nodes[i].nodeid, link_idx, protocol, src, &nodes[i].address[link_idx]) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); exit(FAIL); } if (portoffset) { if (nodes[thisidx].address[link_idx].ss_family == AF_INET) { so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx]; so_in->sin_port = (htons(thisport)); so_in = (struct sockaddr_in *)&nodes[i].address[link_idx]; so_in->sin_port = (htons(otherport)); } else { so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx]; so_in6->sin6_port = (htons(thisport)); so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx]; so_in6->sin6_port = (htons(otherport)); } } if (knet_link_set_enable(knet_h, nodes[i].nodeid, link_idx, 1) < 0) { printf("knet_link_set_enable failed: %s\n", strerror(errno)); exit(FAIL); } } } if (knet_handle_setfwd(knet_h, 1) < 0) { printf("knet_handle_setfwd failed: %s\n", strerror(errno)); exit(FAIL); } if (wait) { while(!allnodesup) { allnodesup = 1; for (i=0; i < onidx; i++) { if (i == thisidx) { continue; } if(knet_h->host_index[nodes[i].nodeid]->status.reachable != 1) { printf("waiting host %d to be reachable\n", nodes[i].nodeid); allnodesup = 0; } } if (!allnodesup) { sleep(1); } } sleep(1); } } static int ping_dst_host_filter(void *pvt_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, uint16_t this_host_id, uint16_t src_host_id, int8_t *dst_channel, uint16_t *dst_host_ids, size_t *dst_host_ids_entries) { if (broadcast_test) { return 1; } if (tx_rx == KNET_NOTIFY_TX) { memmove(&dst_host_ids[0], outdata, 2); } else { dst_host_ids[0] = this_host_id; } *dst_host_ids_entries = 1; return 0; } static void *_rx_thread(void *args) { fd_set rfds; ssize_t len; struct timeval tv; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; int i, msg_recv; for (i = 0; i < PCKT_FRAG_MAX; i++) { rx_buf[i] = malloc(KNET_MAX_PACKET_SIZE); if (!rx_buf[i]) { printf("RXT: Unable to malloc!\n"); return NULL; } memset(rx_buf[i], 0, KNET_MAX_PACKET_SIZE); iov_in[i].iov_base = (void *)rx_buf[i]; iov_in[i].iov_len = KNET_MAX_PACKET_SIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } select_loop: tv.tv_sec = 5; tv.tv_usec = 0; FD_ZERO(&rfds); FD_SET(datafd, &rfds); len = select(FD_SETSIZE, &rfds, NULL, NULL, &tv); if (len < 0) { printf("RXT: Unable select over datafd\nHALTING RX THREAD!\n"); return NULL; } if (!len) { printf("RXT: No data for the past 5 seconds\n"); } if (FD_ISSET(datafd, &rfds)) { msg_recv = recvmmsg(datafd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); if (msg_recv < 0) { printf("RXT: error from recvmmsg: %s\n", strerror(errno)); } for (i = 0; i < msg_recv; i++) { if (msg[i].msg_len == 0) { printf("RXT: received 0 bytes message?\n"); } if (test_type == TEST_PING_AND_DATA) { printf("received %lu bytes message: %s\n", (long)msg[i].msg_len, (char *)msg[i].msg_hdr.msg_iov->iov_base); } /* * do stats here */ } } goto select_loop; return NULL; } static void setup_data_txrx_common(void) { if (!rx_thread) { if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) { printf("Unable to enable dst_host_filter: %s\n", strerror(errno)); exit(FAIL); } printf("Setting up rx thread\n"); if (pthread_create(&rx_thread, 0, _rx_thread, NULL)) { printf("Unable to start rx thread\n"); exit(FAIL); } } } static void stop_rx_thread(void) { void *retval; int i; if (rx_thread) { printf("Shutting down rx thread\n"); pthread_cancel(rx_thread); pthread_join(rx_thread, &retval); for (i = 0; i < PCKT_FRAG_MAX; i ++) { free(rx_buf[i]); } } } static void send_ping_data(void) { const char *buf = "Hello world!\x0"; ssize_t len = strlen(buf); if (knet_send(knet_h, buf, len, channel) != len) { printf("Error sending hello world: %s\n", strerror(errno)); } sleep(1); } static void cleanup_all(void) { if (pthread_mutex_lock(&shutdown_mutex)) { return; } if (shutdown_in_progress) { pthread_mutex_unlock(&shutdown_mutex); return; } shutdown_in_progress = 1; pthread_mutex_unlock(&shutdown_mutex); if (rx_thread) { stop_rx_thread(); } knet_handle_stop(knet_h); } static void sigint_handler(int signum) { printf("Cleaning up... got signal: %d\n", signum); cleanup_all(); exit(PASS); } int main(int argc, char *argv[]) { if (signal(SIGINT, sigint_handler) == SIG_ERR) { printf("Unable to configure SIGINT handler\n"); exit(FAIL); } need_root(); setup_knet(argc, argv); restart: switch(test_type) { default: case TEST_PING: /* basic ping, no data */ sleep(5); break; case TEST_PING_AND_DATA: setup_data_txrx_common(); send_ping_data(); break; case TEST_PERF: setup_data_txrx_common(); break; } if (continous) { goto restart; } cleanup_all(); return PASS; } diff --git a/libknet/transport_sctp.c b/libknet/transport_sctp.c index ffa070b5..81e4476e 100644 --- a/libknet/transport_sctp.c +++ b/libknet/transport_sctp.c @@ -1,1323 +1,1323 @@ #include "config.h" #include #include #include #include #include #include #include #include "compat.h" #include "host.h" #include "link.h" #include "logging.h" #include "common.h" #include "transports.h" #include "threads_common.h" #ifdef HAVE_NETINET_SCTP_H #include /* * https://en.wikipedia.org/wiki/SCTP_packet_structure */ #define KNET_PMTUD_SCTP_OVERHEAD_COMMON 12 #define KNET_PMTUD_SCTP_OVERHEAD_DATA_CHUNK 16 #define KNET_PMTUD_SCTP_OVERHEAD KNET_PMTUD_SCTP_OVERHEAD_COMMON + KNET_PMTUD_SCTP_OVERHEAD_DATA_CHUNK /* * Time to sleep before reconnection attempts. in microseconds */ #define KNET_SCTP_SLEEP_TIME 1000000 /* * this value is per listener */ #define MAX_ACCEPTED_SOCKS 256 typedef struct sctp_handle_info { struct knet_list_head listen_links_list; struct knet_list_head connect_links_list; int connect_epollfd; int connectsockfd[2]; int listen_epollfd; int listensockfd[2]; pthread_t connect_thread; pthread_t listen_thread; } sctp_handle_info_t; /* * use by fd_tracker data type */ #define SCTP_NO_LINK_INFO 0 #define SCTP_LISTEN_LINK_INFO 1 #define SCTP_CONNECT_LINK_INFO 2 /* * this value is per listener */ #define MAX_ACCEPTED_SOCKS 256 typedef struct sctp_listen_link_info { struct knet_list_head list; int listen_sock; int accepted_socks[MAX_ACCEPTED_SOCKS]; struct sockaddr_storage src_address; int on_listener_epoll; int on_rx_epoll; } sctp_listen_link_info_t; typedef struct sctp_connect_link_info { struct knet_list_head list; sctp_listen_link_info_t *listener; struct knet_link *link; struct sockaddr_storage dst_address; int connect_sock; int on_connected_epoll; int on_rx_epoll; int close_sock; } sctp_connect_link_info_t; /* * socket handling functions * * those functions do NOT perform locking. locking * should be handled in the right context from callers */ /* * sockets are removed from rx_epoll from callers * see also error handling functions */ static int _close_connect_socket(knet_handle_t knet_h, struct knet_link *link) { int err = 0, savederrno = 0; sctp_connect_link_info_t *info = link->transport_link; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event ev; if (info->on_connected_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLOUT; ev.data.fd = info->connect_sock; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from the epoll pool: %s", strerror(errno)); goto exit_error; } info->on_connected_epoll = 0; } exit_error: if (info->connect_sock != -1) { if (_set_fd_tracker(knet_h, info->connect_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } close(info->connect_sock); info->connect_sock = -1; } errno = savederrno; return err; } static int _enable_sctp_notifications(knet_handle_t knet_h, int sock, const char *type) { int err = 0, savederrno = 0; struct sctp_event_subscribe events; memset(&events, 0, sizeof (events)); events.sctp_data_io_event = 1; events.sctp_association_event = 1; events.sctp_send_failure_event = 1; events.sctp_address_event = 1; events.sctp_peer_error_event = 1; events.sctp_shutdown_event = 1; if (setsockopt(sock, IPPROTO_SCTP, SCTP_EVENTS, &events, sizeof (events)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to enable %s events: %s", type, strerror(savederrno)); } errno = savederrno; return err; } static int _configure_sctp_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type) { int err = 0, savederrno = 0; int value; int level; #ifdef SOL_SCTP level = SOL_SCTP; #else level = IPPROTO_SCTP; #endif if (_configure_transport_socket(knet_h, sock, address, type) < 0) { savederrno = errno; err = -1; goto exit_error; } value = 1; if (setsockopt(sock, level, SCTP_NODELAY, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set sctp nodelay: %s", strerror(savederrno)); goto exit_error; } value = 1; if (setsockopt(sock, level, SCTP_DISABLE_FRAGMENTS, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set sctp disable fragments: %s", strerror(savederrno)); goto exit_error; } if (_enable_sctp_notifications(knet_h, sock, type) < 0) { savederrno = errno; err = -1; } exit_error: errno = savederrno; return err; } -static int _reconnect_socket(knet_handle_t knet_h, struct knet_link *link) +static int _reconnect_socket(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; - sctp_connect_link_info_t *info = link->transport_link; + sctp_connect_link_info_t *info = kn_link->transport_link; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event ev; - if (connect(info->connect_sock, (struct sockaddr *)&link->dst_addr, sockaddr_len(&link->dst_addr)) < 0) { + if (connect(info->connect_sock, (struct sockaddr *)&kn_link->dst_addr, sockaddr_len(&kn_link->dst_addr)) < 0) { if ((errno != EALREADY) && (errno != EINPROGRESS) && (errno != EISCONN)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to connect SCTP socket %d: %s", info->connect_sock, strerror(savederrno)); goto exit_error; } } if (!info->on_connected_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLOUT; ev.data.fd = info->connect_sock; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, info->connect_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add send/recv to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_connected_epoll = 1; } exit_error: errno = savederrno; return err; } -static int _create_connect_socket(knet_handle_t knet_h, struct knet_link *link) +static int _create_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; - sctp_connect_link_info_t *info = link->transport_link; + sctp_connect_link_info_t *info = kn_link->transport_link; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event ev; int connect_sock; - connect_sock = socket(link->dst_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP); + connect_sock = socket(kn_link->dst_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP); if (connect_sock < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create send/recv socket: %s", strerror(savederrno)); goto exit_error; } - if (_configure_sctp_socket(knet_h, connect_sock, &link->dst_addr, "SCTP connect") < 0) { + if (_configure_sctp_socket(knet_h, connect_sock, &kn_link->dst_addr, "SCTP connect") < 0) { savederrno = errno; err = -1; goto exit_error; } if (_set_fd_tracker(knet_h, connect_sock, KNET_TRANSPORT_SCTP, SCTP_CONNECT_LINK_INFO, info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } info->connect_sock = connect_sock; info->close_sock = 0; - if (_reconnect_socket(knet_h, link) < 0) { + if (_reconnect_socket(knet_h, kn_link) < 0) { savederrno = errno; err = -1; goto exit_error; } exit_error: if (err) { if (info->on_connected_epoll) { epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, connect_sock, &ev); } if (connect_sock >= 0) { close(connect_sock); } } errno = savederrno; return err; } /* * socket error management functions * * both called with global read lock. * * NOTE: we need to remove the fd from the epoll as soon as possible * even before we notify the respective thread to take care of it * because scheduling can make it so that this thread will overload * and the threads supposed to take care of the error will never * be able to take action. * we CANNOT handle FDs here diretly (close/reconnect/etc) due * to locking context. We need to delegate that to their respective * management threads within global write lock. * * this function is called from: * - RX thread with recv_err <= 0 directly on recvmmsg error * - transport_rx_is_data when msg_len == 0 (recv_err = 1) * - transport_rx_is_data on notification (recv_err = 2) * * basically this small abouse of recv_err is to detect notifications * generated by sockets created by listen(). */ static int sctp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno) { struct epoll_event ev; sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data; sctp_listen_link_info_t *listen_info = knet_h->knet_transport_fd_tracker[sockfd].data; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) { case SCTP_CONNECT_LINK_INFO: /* * all connect link have notifications enabled * and we accept only data from notification and * generic recvmmsg errors. * * Errors generated by msg_len 0 can be ignored because * they follow a notification (double notification) */ if (recv_err != 1) { connect_info->link->transport_connected = 0; if (connect_info->on_rx_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = sockfd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s", strerror(errno)); return -1; } connect_info->on_rx_epoll = 0; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received an error", sockfd); if (sendto(handle_info->connectsockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno)); } } break; case SCTP_LISTEN_LINK_INFO: if (listen_info->listen_sock != sockfd) { if (recv_err != 1) { if (listen_info->on_rx_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = sockfd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s", strerror(errno)); return -1; } listen_info->on_rx_epoll = 0; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying listen thread that sockfd %d received an error", sockfd); if (sendto(handle_info->listensockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify listen thread: %s", strerror(errno)); } } } else { /* * this means the listen() socket has generated * a notification. now what? :-) */ log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen() socket %d", sockfd); } break; default: break; } return 0; } /* * NOTE: sctp_transport_rx_is_data is called with global rdlock * delegate any FD error management to sctp_transport_rx_sock_error * and keep this code to parsing incoming data only */ static int sctp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct mmsghdr msg) { int i; struct iovec *iov = msg.msg_hdr.msg_iov; size_t iovlen = msg.msg_hdr.msg_iovlen; struct sctp_assoc_change *sac; union sctp_notification *snp; if (!(msg.msg_hdr.msg_flags & MSG_NOTIFICATION)) { if (msg.msg_len == 0) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "received 0 bytes len packet: %d", sockfd); /* * NOTE: with event notification enabled, we receive error twice: * 1) from the event notification * 2) followed by a 0 byte msg_len * * This is generally not a problem if not for causing extra * handling for the same issue. Should we drop notifications * and keep the code generic (handle all errors via msg_len = 0) * or keep the duplication as safety measure, or drop msg_len = 0 * handling (what about sockets without events enabled?) */ sctp_transport_rx_sock_error(knet_h, sockfd, 1, 0); return 1; } return 2; } if (!(msg.msg_hdr.msg_flags & MSG_EOR)) { return 1; } for (i=0; i< iovlen; i++) { snp = iov[i].iov_base; switch (snp->sn_header.sn_type) { case SCTP_ASSOC_CHANGE: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change"); sac = &snp->sn_assoc_change; if (sac->sac_state == SCTP_COMM_LOST) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change: comm_lost"); sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0); } break; case SCTP_SHUTDOWN_EVENT: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp shutdown event"); sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0); break; case SCTP_SEND_FAILED: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp send failed"); break; case SCTP_PEER_ADDR_CHANGE: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp peer addr change"); break; case SCTP_REMOTE_ERROR: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp remote error"); break; default: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] unknown sctp event type: %hu\n", snp->sn_header.sn_type); break; } } return 0; } /* * connect / outgoing socket management thread */ /* * _handle_connected_sctp* are called with a global write lock * from the connect_thread */ static void _handle_connected_sctp(knet_handle_t knet_h, int connect_sock) { int err; struct epoll_event ev; unsigned int status, len = sizeof(status); sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_connect_link_info_t *info = knet_h->knet_transport_fd_tracker[connect_sock].data; - struct knet_link *link = info->link; + struct knet_link *kn_link = info->link; err = getsockopt(connect_sock, SOL_SOCKET, SO_ERROR, &status, &len); if (err) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP getsockopt() on connecting socket %d failed: %s", connect_sock, strerror(errno)); return; } if (info->close_sock) { - if (_close_connect_socket(knet_h, link) < 0) { + if (_close_connect_socket(knet_h, kn_link) < 0) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close sock %d from _handle_connected_sctp: %s", connect_sock, strerror(errno)); return; } info->close_sock = 0; - if (_create_connect_socket(knet_h, link) < 0) { + if (_create_connect_socket(knet_h, kn_link) < 0) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to recreate connecting sock! %s", strerror(errno)); return; } } if (status) { log_info(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect on %d to %s port %s failed: %s", - connect_sock, link->status.dst_ipaddr, link->status.dst_port, + connect_sock, kn_link->status.dst_ipaddr, kn_link->status.dst_port, strerror(status)); /* * No need to create a new socket if connect failed, * just retry connect */ _reconnect_socket(knet_h, info->link); return; } /* * Connected - Remove us from the connect epoll */ memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLOUT; ev.data.fd = connect_sock; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, connect_sock, &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket %d from epoll pool: %s", connect_sock, strerror(errno)); } info->on_connected_epoll = 0; - link->transport_connected = 1; - link->outsock = info->connect_sock; + kn_link->transport_connected = 1; + kn_link->outsock = info->connect_sock; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = connect_sock; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, connect_sock, &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connected socket to epoll pool: %s", strerror(errno)); } info->on_rx_epoll = 1; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP handler fd %d now connected to %s port %s", connect_sock, - link->status.dst_ipaddr, link->status.dst_port); + kn_link->status.dst_ipaddr, kn_link->status.dst_port); } static void _handle_connected_sctp_errors(knet_handle_t knet_h) { int sockfd = -1; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_connect_link_info_t *info; if (recv(handle_info->connectsockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on connectsockfd"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for connected socket fd error"); return; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing connected error on socket: %d", sockfd); info = knet_h->knet_transport_fd_tracker[sockfd].data; info->close_sock = 1; info->link->transport_connected = 0; _reconnect_socket(knet_h, info->link); } static void *_sctp_connect_thread(void *data) { int savederrno; int i, nev; knet_handle_t knet_h = (knet_handle_t) data; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(handle_info->connect_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1); if (nev < 0) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect handler EPOLL ERROR: %s", strerror(errno)); continue; } /* * Sort out which FD has a connection */ savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s", strerror(savederrno)); continue; } /* * minor optimization: deduplicate events * * in some cases we can receive multiple notifcations * of the same FD having issues or need handling. * It's enough to process it once even tho it's safe * to handle them multiple times. */ for (i = 0; i < nev; i++) { if (events[i].data.fd == handle_info->connectsockfd[0]) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for connected socket"); _handle_connected_sctp_errors(knet_h); } else { if (_is_valid_fd(knet_h, events[i].data.fd) == 1) { _handle_connected_sctp(knet_h, events[i].data.fd); } else { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for dead fd %d\n", events[i].data.fd); } } } pthread_rwlock_unlock(&knet_h->global_rwlock); /* * this thread can generate events for itself. * we need to sleep in between loops to allow other threads * to be scheduled */ usleep(KNET_SCTP_SLEEP_TIME); } return NULL; } /* * listen/incoming connections management thread */ /* * Listener received a new connection * called with a write lock from main thread */ static void _handle_incoming_sctp(knet_handle_t knet_h, int listen_sock) { int err = 0, savederrno = 0; int new_fd; int i = -1; sctp_listen_link_info_t *info = knet_h->knet_transport_fd_tracker[listen_sock].data; struct epoll_event ev; struct sockaddr_storage ss; socklen_t sock_len = sizeof(ss); char addr_str[KNET_MAX_HOST_LEN]; char port_str[KNET_MAX_PORT_LEN]; new_fd = accept(listen_sock, (struct sockaddr *)&ss, &sock_len); if (new_fd < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accept error: %s", strerror(errno)); goto exit_error; } if (knet_addrtostr(&ss, sizeof(ss), addr_str, KNET_MAX_HOST_LEN, port_str, KNET_MAX_PORT_LEN) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to gather socket info"); goto exit_error; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: received connection from: %s port: %s", addr_str, port_str); /* * Keep a track of all accepted FDs */ for (i=0; iaccepted_socks[i] == -1) { info->accepted_socks[i] = new_fd; break; } } if (i == MAX_ACCEPTED_SOCKS) { errno = EBUSY; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: too many connections!"); goto exit_error; } if (_configure_common_socket(knet_h, new_fd, "SCTP incoming") < 0) { savederrno = errno; err = -1; goto exit_error; } if (_enable_sctp_notifications(knet_h, new_fd, "Incoming connection") < 0) { savederrno = errno; err = -1; goto exit_error; } if (_set_fd_tracker(knet_h, new_fd, KNET_TRANSPORT_SCTP, SCTP_LISTEN_LINK_INFO, info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(errno)); goto exit_error; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = new_fd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, new_fd, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to add accepted socket %d to epoll pool: %s", new_fd, strerror(errno)); goto exit_error; } info->on_rx_epoll = 1; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accepted new fd %d for %s/%s (listen fd: %d). index: %d", new_fd, addr_str, port_str, info->listen_sock, i); exit_error: if (err) { if ((i >= 0) || (i < MAX_ACCEPTED_SOCKS)) { info->accepted_socks[i] = -1; } _set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL); close(new_fd); } errno = savederrno; return; } /* * Listen thread received a notification of a bad socket that needs closing * called with a write lock from main thread */ static void _handle_listen_sctp_errors(knet_handle_t knet_h) { int sockfd = -1; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_listen_link_info_t *info; int i; if (recv(handle_info->listensockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on listensockfd"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen socket fd error"); return; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing listen error on socket: %d", sockfd); info = knet_h->knet_transport_fd_tracker[sockfd].data; for (i=0; iaccepted_socks[i]) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Closing accepted socket %d", sockfd); _set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL); info->accepted_socks[i] = -1; close(sockfd); } } } static void *_sctp_listen_thread(void *data) { int savederrno; int i, nev; knet_handle_t knet_h = (knet_handle_t) data; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(handle_info->listen_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1); if (nev < 0) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listen handler EPOLL ERROR: %s", strerror(errno)); continue; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s", strerror(savederrno)); continue; } /* * Sort out which FD has an incoming connection */ for (i = 0; i < nev; i++) { if (events[i].data.fd == handle_info->listensockfd[0]) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for listener/accepted socket"); _handle_listen_sctp_errors(knet_h); } else { if (_is_valid_fd(knet_h, events[i].data.fd) == 1) { _handle_incoming_sctp(knet_h, events[i].data.fd); } else { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received listen notification from invalid socket"); } } } pthread_rwlock_unlock(&knet_h->global_rwlock); } return NULL; } /* * sctp_link_listener_start/stop are called in global write lock * context from set_config and clear_config. */ static sctp_listen_link_info_t *sctp_link_listener_start(knet_handle_t knet_h, struct knet_link *link) { int err = 0, savederrno = 0; int listen_sock = -1; struct epoll_event ev; sctp_listen_link_info_t *info = NULL; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; /* * Only allocate a new listener if src address is different */ knet_list_for_each_entry(info, &handle_info->listen_links_list, list) { if (memcmp(&info->src_address, &link->src_addr, sizeof(struct sockaddr_storage)) == 0) { return info; } } info = malloc(sizeof(sctp_listen_link_info_t)); if (!info) { err = -1; goto exit_error; } memset(info, 0, sizeof(sctp_listen_link_info_t)); memset(info->accepted_socks, -1, sizeof(info->accepted_socks)); memcpy(&info->src_address, &link->src_addr, sizeof(struct sockaddr_storage)); listen_sock = socket(link->src_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP); if (listen_sock < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create listener socket: %s", strerror(savederrno)); goto exit_error; } if (_configure_sctp_socket(knet_h, listen_sock, &link->src_addr, "SCTP listener") < 0) { savederrno = errno; err = -1; goto exit_error; } if (bind(listen_sock, (struct sockaddr *)&link->src_addr, sockaddr_len(&link->src_addr)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind listener socket: %s", strerror(savederrno)); goto exit_error; } if (listen(listen_sock, 5) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to listen on listener socket: %s", strerror(savederrno)); goto exit_error; } if (_set_fd_tracker(knet_h, listen_sock, KNET_TRANSPORT_SCTP, SCTP_LISTEN_LINK_INFO, info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = listen_sock; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, listen_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listener to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_listener_epoll = 1; info->listen_sock = listen_sock; knet_list_add(&info->list, &handle_info->listen_links_list); log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Listening on fd %d for %s:%s", listen_sock, link->status.src_ipaddr, link->status.src_port); exit_error: if (err) { if (info->on_listener_epoll) { epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, listen_sock, &ev); } if (listen_sock >= 0) { close(listen_sock); } if (info) { free(info); info = NULL; } } errno = savederrno; return info; } static int sctp_link_listener_stop(knet_handle_t knet_h, struct knet_link *link) { int err = 0, savederrno = 0; int found = 0, i; struct knet_host *host; int link_idx; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_connect_link_info_t *this_link_info = link->transport_link; sctp_listen_link_info_t *info = this_link_info->listener; sctp_connect_link_info_t *link_info; struct epoll_event ev; for (host = knet_h->host_head; host != NULL; host = host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (&host->link[link_idx] == link) continue; link_info = host->link[link_idx].transport_link; if ((link_info) && (link_info->listener == info) && (host->link[link_idx].status.enabled == 1)) { found = 1; break; } } } if (found) { this_link_info->listener = NULL; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listener socket %d still in use", info->listen_sock); savederrno = EBUSY; err = -1; goto exit_error; } if (info->on_listener_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->listen_sock; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, info->listen_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_listener_epoll = 0; } if (_set_fd_tracker(knet_h, info->listen_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } close(info->listen_sock); for (i=0; i< MAX_ACCEPTED_SOCKS; i++) { if (info->accepted_socks[i] > -1) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->accepted_socks[i]; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->accepted_socks[i], &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s", strerror(errno)); } info->on_rx_epoll = 0; close(info->accepted_socks[i]); if (_set_fd_tracker(knet_h, info->accepted_socks[i], KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } info->accepted_socks[i] = -1; } } knet_list_del(&info->list); free(info); this_link_info->listener = NULL; exit_error: errno = savederrno; return err; } /* * Links config/clear. Both called with global wrlock from link_set_config/clear_config */ static int sctp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *link) { int savederrno = 0, err = 0; sctp_connect_link_info_t *info; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; info = malloc(sizeof(sctp_connect_link_info_t)); if (!info) { goto exit_error; } memset(info, 0, sizeof(sctp_connect_link_info_t)); link->transport_link = info; info->link = link; memcpy(&info->dst_address, &link->dst_addr, sizeof(struct sockaddr_storage)); info->on_connected_epoll = 0; info->connect_sock = -1; info->listener = sctp_link_listener_start(knet_h, link); if (!info->listener) { savederrno = errno; err = -1; goto exit_error; } if (_create_connect_socket(knet_h, link) < 0) { savederrno = errno; err = -1; goto exit_error; } knet_list_add(&info->list, &handle_info->connect_links_list); link->outsock = info->connect_sock; exit_error: if (err) { if (info) { if (info->connect_sock) { close(info->connect_sock); } if (info->listener) { sctp_link_listener_stop(knet_h, link); } link->transport_link = NULL; free(info); } } errno = savederrno; return err; } /* * called with global wrlock * FIX exit path error handling */ static int sctp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *link) { int err = 0, savederrno = 0; sctp_connect_link_info_t *info; struct epoll_event ev; if (!link) { errno = EINVAL; return -1; } info = link->transport_link; if (!info) { errno = EINVAL; return -1; } if ((sctp_link_listener_stop(knet_h, link) <0) && (errno != EBUSY)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener trasport: %s", strerror(savederrno)); goto exit_error; } if (info->on_rx_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->connect_sock; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_rx_epoll = 0; } if (_close_connect_socket(knet_h, link) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close connected socket: %s", strerror(savederrno)); goto exit_error; } knet_list_del(&info->list); free(info); link->transport_link = NULL; exit_error: errno = savederrno; return err; } /* * transport_free and transport_init are * called only from knet_handle_new and knet_handle_free. * all resources (hosts/links) should have been already freed at this point * and they are called in a write locked context, hence they * don't need their own locking. */ static int sctp_transport_free(knet_handle_t knet_h) { sctp_handle_info_t *handle_info; void *thread_status; struct epoll_event ev; if (!knet_h->transports[KNET_TRANSPORT_SCTP]) { errno = EINVAL; return -1; } handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; /* * keep it here while we debug list usage and such */ if (!knet_list_empty(&handle_info->listen_links_list)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. listen links list is not empty"); } if (!knet_list_empty(&handle_info->connect_links_list)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. connect links list is not empty"); } if (handle_info->listen_thread) { pthread_cancel(handle_info->listen_thread); pthread_join(handle_info->listen_thread, &thread_status); } if (handle_info->connect_thread) { pthread_cancel(handle_info->connect_thread); pthread_join(handle_info->connect_thread, &thread_status); } if (handle_info->listensockfd[0] >= 0) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->listensockfd[0]; epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, handle_info->listensockfd[0], &ev); } if (handle_info->connectsockfd[0] >= 0) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->connectsockfd[0]; epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, handle_info->connectsockfd[0], &ev); } _close_socketpair(knet_h, handle_info->connectsockfd); _close_socketpair(knet_h, handle_info->listensockfd); if (handle_info->listen_epollfd >= 0) { close(handle_info->listen_epollfd); } if (handle_info->connect_epollfd >= 0) { close(handle_info->connect_epollfd); } free(handle_info); knet_h->transports[KNET_TRANSPORT_SCTP] = NULL; return 0; } static int sctp_transport_init(knet_handle_t knet_h) { int err = 0, savederrno = 0; sctp_handle_info_t *handle_info; struct epoll_event ev; if (knet_h->transports[KNET_TRANSPORT_SCTP]) { errno = EEXIST; return -1; } handle_info = malloc(sizeof(sctp_handle_info_t)); if (!handle_info) { return -1; } memset(handle_info, 0,sizeof(sctp_handle_info_t)); knet_h->transports[KNET_TRANSPORT_SCTP] = handle_info; knet_list_init(&handle_info->listen_links_list); knet_list_init(&handle_info->connect_links_list); handle_info->listen_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (handle_info->listen_epollfd < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll listen fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(handle_info->listen_epollfd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on listen_epollfd: %s", strerror(savederrno)); goto exit_fail; } handle_info->connect_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (handle_info->connect_epollfd < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll connect fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(handle_info->connect_epollfd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on connect_epollfd: %s", strerror(savederrno)); goto exit_fail; } if (_init_socketpair(knet_h, handle_info->connectsockfd) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init connect socketpair: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->connectsockfd[0]; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, handle_info->connectsockfd[0], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connectsockfd[0] to connect epoll pool: %s", strerror(savederrno)); goto exit_fail; } if (_init_socketpair(knet_h, handle_info->listensockfd) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init listen socketpair: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->listensockfd[0]; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, handle_info->listensockfd[0], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listensockfd[0] to listen epoll pool: %s", strerror(savederrno)); goto exit_fail; } /* * Start connect & listener threads */ savederrno = pthread_create(&handle_info->listen_thread, 0, _sctp_listen_thread, (void *) knet_h); if (savederrno) { err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp listen thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&handle_info->connect_thread, 0, _sctp_connect_thread, (void *) knet_h); if (savederrno) { err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp connect thread: %s", strerror(savederrno)); goto exit_fail; } exit_fail: if (err < 0) { sctp_transport_free(knet_h); } errno = savederrno; return err; } static knet_transport_ops_t sctp_transport_ops = { .transport_name = "SCTP", .transport_id = KNET_TRANSPORT_SCTP, .transport_mtu_overhead = KNET_PMTUD_SCTP_OVERHEAD, .transport_init = sctp_transport_init, .transport_free = sctp_transport_free, .transport_link_set_config = sctp_transport_link_set_config, .transport_link_clear_config = sctp_transport_link_clear_config, .transport_rx_sock_error = sctp_transport_rx_sock_error, .transport_rx_is_data = sctp_transport_rx_is_data, }; knet_transport_ops_t *get_sctp_transport() { return &sctp_transport_ops; } #else // HAVE_NETINET_SCTP_H knet_transport_ops_t *get_sctp_transport() { return NULL; } #endif diff --git a/libknet/transport_udp.c b/libknet/transport_udp.c index 92973d6a..bcee17e1 100644 --- a/libknet/transport_udp.c +++ b/libknet/transport_udp.c @@ -1,268 +1,268 @@ #include "config.h" #include #include #include #include #include #include #include "libknet.h" #include "compat.h" #include "host.h" #include "link.h" #include "logging.h" #include "common.h" #include "transports.h" #define KNET_PMTUD_UDP_OVERHEAD 8 typedef struct udp_handle_info { struct knet_list_head links_list; } udp_handle_info_t; typedef struct udp_link_info { struct knet_list_head list; struct sockaddr_storage local_address; int socket_fd; int on_epoll; } udp_link_info_t; -static int udp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *link) +static int udp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; int sock = -1; struct epoll_event ev; udp_link_info_t *info; udp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_UDP]; /* * Only allocate a new link if the local address is different */ knet_list_for_each_entry(info, &handle_info->links_list, list) { - if (memcmp(&info->local_address, &link->src_addr, sizeof(struct sockaddr_storage)) == 0) { + if (memcmp(&info->local_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)) == 0) { log_debug(knet_h, KNET_SUB_TRANSP_UDP, "Re-using existing UDP socket for new link"); - link->outsock = info->socket_fd; - link->transport_link = info; - link->transport_connected = 1; + kn_link->outsock = info->socket_fd; + kn_link->transport_link = info; + kn_link->transport_connected = 1; return 0; } } info = malloc(sizeof(udp_link_info_t)); if (!info) { err = -1; goto exit_error; } - sock = socket(link->src_addr.ss_family, SOCK_DGRAM, 0); + sock = socket(kn_link->src_addr.ss_family, SOCK_DGRAM, 0); if (sock < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_LISTENER, "Unable to create listener socket: %s", strerror(savederrno)); goto exit_error; } - if (_configure_transport_socket(knet_h, sock, &link->src_addr, "UDP") < 0) { + if (_configure_transport_socket(knet_h, sock, &kn_link->src_addr, "UDP") < 0) { savederrno = errno; err = -1; goto exit_error; } - if (bind(sock, (struct sockaddr *)&link->src_addr, sockaddr_len(&link->src_addr))) { + if (bind(sock, (struct sockaddr *)&kn_link->src_addr, sockaddr_len(&kn_link->src_addr))) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to bind listener socket: %s", strerror(savederrno)); goto exit_error; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = sock; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to add listener to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_epoll = 1; if (_set_fd_tracker(knet_h, sock, KNET_TRANSPORT_UDP, 0, info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } - memcpy(&info->local_address, &link->src_addr, sizeof(struct sockaddr_storage)); + memcpy(&info->local_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)); info->socket_fd = sock; knet_list_add(&info->list, &handle_info->links_list); - link->outsock = sock; - link->transport_link = info; - link->transport_connected = 1; + kn_link->outsock = sock; + kn_link->transport_link = info; + kn_link->transport_connected = 1; exit_error: if (err) { if (info) { if (info->on_epoll) { epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sock, &ev); } free(info); } if (sock >= 0) { close(sock); } } errno = savederrno; return err; } -static int udp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *link) +static int udp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; int found = 0; struct knet_host *host; int link_idx; - udp_link_info_t *info = link->transport_link; + udp_link_info_t *info = kn_link->transport_link; struct epoll_event ev; for (host = knet_h->host_head; host != NULL; host = host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { - if (&host->link[link_idx] == link) + if (&host->link[link_idx] == kn_link) continue; if ((host->link[link_idx].transport_link == info) && (host->link[link_idx].status.enabled == 1)) { found = 1; break; } } } if (found) { log_debug(knet_h, KNET_SUB_TRANSP_UDP, "UDP socket %d still in use", info->socket_fd); savederrno = EBUSY; err = -1; goto exit_error; } if (info->on_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->socket_fd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->socket_fd, &ev) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to remove UDP socket from epoll poll: %s", strerror(errno)); goto exit_error; } info->on_epoll = 0; } if (_set_fd_tracker(knet_h, info->socket_fd, KNET_MAX_TRANSPORTS, 0, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_UDP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } close(info->socket_fd); knet_list_del(&info->list); - free(link->transport_link); + free(kn_link->transport_link); exit_error: errno = savederrno; return err; } static int udp_transport_free(knet_handle_t knet_h) { udp_handle_info_t *handle_info; if (!knet_h->transports[KNET_TRANSPORT_UDP]) { errno = EINVAL; return -1; } handle_info = knet_h->transports[KNET_TRANSPORT_UDP]; /* * keep it here while we debug list usage and such */ if (!knet_list_empty(&handle_info->links_list)) { log_err(knet_h, KNET_SUB_TRANSP_UDP, "Internal error. handle list is not empty"); return -1; } free(handle_info); knet_h->transports[KNET_TRANSPORT_UDP] = NULL; return 0; } static int udp_transport_init(knet_handle_t knet_h) { udp_handle_info_t *handle_info; if (knet_h->transports[KNET_TRANSPORT_UDP]) { errno = EEXIST; return -1; } handle_info = malloc(sizeof(udp_handle_info_t)); if (!handle_info) { return -1; } knet_h->transports[KNET_TRANSPORT_UDP] = handle_info; knet_list_init(&handle_info->links_list); return 0; } static int udp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno) { /* * UDP can't do much here afaict, perhaps clear the SO_ERROR? */ log_err(knet_h, KNET_SUB_TRANSP_UDP, "Sock: %d received error: %d (%s)", sockfd, recv_err, strerror(recv_errno)); return 0; } static int udp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct mmsghdr msg) { if (msg.msg_len == 0) return 0; return 2; } static knet_transport_ops_t udp_transport_ops = { .transport_name = "UDP", .transport_id = KNET_TRANSPORT_UDP, .transport_mtu_overhead = KNET_PMTUD_UDP_OVERHEAD, .transport_init = udp_transport_init, .transport_free = udp_transport_free, .transport_link_set_config = udp_transport_link_set_config, .transport_link_clear_config = udp_transport_link_clear_config, .transport_rx_sock_error = udp_transport_rx_sock_error, .transport_rx_is_data = udp_transport_rx_is_data, }; knet_transport_ops_t *get_udp_transport() { return &udp_transport_ops; }