diff --git a/TODO b/TODO index 1e3cbd22..77ba1f1d 100644 --- a/TODO +++ b/TODO @@ -1,100 +1,103 @@ distro/packaging: - (issue) check why systemd doesn't let me set scheduler priority fixed for systemd unit file, LSB init + systemd still fails https://bugzilla.redhat.com/show_bug.cgi?id=893015 link/host level: + - (issue) SCTP: in kernel fragmentation appears to be broken and needs + some investigation. workaround in place by disabling + in-kernel SCTP fragmentation - (issue) transports: plumb TX error handling on per-transport hooks - (issue) transports: review all error handling code failures, specially on clear_config, to not leak and better clean in case of errors - (issue) improve test suite to cover for all transports (needs transport list API) - (issue) UDP/SCTP support for dynamic links (aka no dst_address) this one needs plumbing inside RX thread - (issue) change knet_bench to allow protocol specification per link - (issue) review memory locking across - (issue) simplify handling of DATA and HOSTINFO code paths in send/recv code - (issue) need bind to interface for dynamic ip local interfaces vs src ip address or find a way to autodetect the new ip on that interface (listen to kernel netlink?) - (issue) must implement link auth via user/passwd. This is necessary in case key is leaked. - (issue) standardize exit labels (ex out_unlock / exit_unlock) and variable/function names. - (issue) review how TX onwire pckt info are filled in between inbuf/socket/frags - (rfe) add fd_tracker error exit check and perhaps use it for local sockets too to make it easier to identify leaks and fd abuse - (rfe) add 2 new APIs: addr2str / str2addr and transport list query - (rfe) link status callback notification - (rfe) compress: should only compress user data, we will add a bit in the data header to indicate if the pckt is compressed or not (save time). this approach allow runtime change of compress. open questions are: methods? level? zlib? lzo? bz? lzma? xz? how much do we save by compressin our header? compress must happen before encrypt we can express compress data in packet type without adding extra flags to the headers. DATA -> BZ/GZDATA and we can change that right before encrypting. Using a similar approach to PING_MASK - (rfe) crypto: expand API to support dual key for rekey process - (rfe) link id made optional? right now we need the link id to match on both sides of the connection. this is somewhat annoying from a user perspective. Evaluate if we can make it optional. - (rfe) make hostid autogenerated in a consistent way? - (rfe) Check IPV6_NEXTHOP for v6 sockets and find equivalent for v4 (Jesper?) this would allow using one IP address as destination via multiple links - (rfe) add statistics at different levels (pckt per host/link, bytes, crypto overhead, frame overhead, pure data...) - (rfe) link connection access-list (chrissie has working generic code for this one, needs merging and API) - (rfe) improve host-to-host communication. Right now I am not satisfied with the current implementation, even if it works. - (rfe) implement link switching via scoring system based on: 1) latency 2) priority (auto/manual) 3) usage (over XX% traffic start RR) 4) flapping of the links (time/sec) this requires complex rules setting and a super efficent way to look up destination links 5) if links are stable, reduce the number of links in a-a min 2 - (rfe) benchmark tests for all critical paths in switching threads - (rfe) network convergence protocol (host exchange) - (rfe) reswitching of packets - (rfe) look into UDP+ECN bit set to avoid overloading sockets? - (rfe) add openssl support? - (rfe) consider adding threadpools to process data packets in parallel libknet: - (issue) review logging policy/levels in public api call example is scanning for active links in a host that would return a half gazzillion useless log entries - (issue) add .3 man pages libtap: - (issue) add .3 man pages - (issue) improve tests to cover thread safety and better error codes specially from the up/down handling. - (rfe) consider adding dhcp support for tap device it can be done now via up.d/ scripts, but it's not intuitive kronostnetd: - (issue) beside the code that is as bad as it can possibly be and will make you wish to have a tea spoon handy to carve your eyeballs out, the vty needs a good clean/rewrite - (issue) fix config file format. current one will make you scream - (issue) missing output from several command execution failures in vty mode - (issue) fix check_param for ip/prefix/crypto (this is part of the rewrite as it needs more clever arg parsing code/method) - (rfe) add logging config (per subsystem/global) - (rfe) split vty_cmd_files to be smaller. it's just too big to handle nicely. - (rfe) add equivalent of "description: ...." to various levels - (rfe) add optional options. right now it's necessary to specify everything all the time. - (rfe) implement tab completion on options general: - (issue) missing unit tests on many many bits - (issue) missing docs of all kind, devel, users, admin guide. - (rfe) check code with coverity diff --git a/libknet/transport_sctp.c b/libknet/transport_sctp.c index 03a6a101..88655e34 100644 --- a/libknet/transport_sctp.c +++ b/libknet/transport_sctp.c @@ -1,1303 +1,1312 @@ #include "config.h" #include #include #include #include #include #include #include #include #include "host.h" #include "link.h" #include "logging.h" #include "common.h" #include "transports.h" #include "threads_common.h" #ifdef HAVE_NETINET_SCTP_H #include /* * https://en.wikipedia.org/wiki/SCTP_packet_structure */ #define KNET_PMTUD_SCTP_OVERHEAD_COMMON 12 #define KNET_PMTUD_SCTP_OVERHEAD_DATA_CHUNK 16 #define KNET_PMTUD_SCTP_OVERHEAD KNET_PMTUD_SCTP_OVERHEAD_COMMON + KNET_PMTUD_SCTP_OVERHEAD_DATA_CHUNK /* * Time to sleep before reconnection attempts. in microseconds */ #define KNET_SCTP_SLEEP_TIME 1000000 /* * this value is per listener */ #define MAX_ACCEPTED_SOCKS 256 typedef struct sctp_handle_info { struct knet_list_head listen_links_list; struct knet_list_head connect_links_list; int connect_epollfd; int connectsockfd[2]; int listen_epollfd; int listensockfd[2]; pthread_t connect_thread; pthread_t listen_thread; } sctp_handle_info_t; /* * use by fd_tracker data type */ #define SCTP_NO_LINK_INFO 0 #define SCTP_LISTEN_LINK_INFO 1 #define SCTP_CONNECT_LINK_INFO 2 /* * this value is per listener */ #define MAX_ACCEPTED_SOCKS 256 typedef struct sctp_listen_link_info { struct knet_list_head list; int listen_sock; int accepted_socks[MAX_ACCEPTED_SOCKS]; struct sockaddr_storage src_address; int on_listener_epoll; int on_rx_epoll; } sctp_listen_link_info_t; typedef struct sctp_connect_link_info { struct knet_list_head list; sctp_listen_link_info_t *listener; struct knet_link *link; struct sockaddr_storage dst_address; int connect_sock; int on_connected_epoll; int on_rx_epoll; int close_sock; } sctp_connect_link_info_t; /* * socket handling functions * * those functions do NOT perform locking. locking * should be handled in the right context from callers */ /* * sockets are removed from rx_epoll from callers * see also error handling functions */ static int _close_connect_socket(knet_handle_t knet_h, struct knet_link *link) { int err = 0, savederrno = 0; sctp_connect_link_info_t *info = link->transport_link; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event ev; if (info->on_connected_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLOUT; ev.data.fd = info->connect_sock; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from the epoll pool: %s", strerror(errno)); goto exit_error; } info->on_connected_epoll = 0; } exit_error: if (info->connect_sock != -1) { if (_set_fd_tracker(knet_h, info->connect_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } close(info->connect_sock); info->connect_sock = -1; } errno = savederrno; return err; } static int _enable_sctp_notifications(knet_handle_t knet_h, int sock, const char *type) { int err = 0, savederrno = 0; struct sctp_event_subscribe events; memset(&events, 0, sizeof (events)); events.sctp_data_io_event = 1; events.sctp_association_event = 1; events.sctp_send_failure_event = 1; events.sctp_address_event = 1; events.sctp_peer_error_event = 1; events.sctp_shutdown_event = 1; if (setsockopt(sock, IPPROTO_SCTP, SCTP_EVENTS, &events, sizeof (events)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to enable %s events: %s", type, strerror(savederrno)); } errno = savederrno; return err; } static int _configure_sctp_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type) { int err = 0, savederrno = 0; int value; if (_configure_transport_socket(knet_h, sock, address, type) < 0) { savederrno = errno; err = -1; goto exit_error; } value = 1; if (setsockopt(sock, SOL_SCTP, SCTP_NODELAY, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set sctp nodelay: %s", strerror(savederrno)); goto exit_error; } + value = 1; + if (setsockopt(sock, SOL_SCTP, SCTP_DISABLE_FRAGMENTS, &value, sizeof(value)) < 0) { + savederrno = errno; + err = -1; + log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set sctp disable fragments: %s", + strerror(savederrno)); + goto exit_error; + } + if (_enable_sctp_notifications(knet_h, sock, type) < 0) { savederrno = errno; err = -1; } exit_error: errno = savederrno; return err; } static int _reconnect_socket(knet_handle_t knet_h, struct knet_link *link) { int err = 0, savederrno = 0; sctp_connect_link_info_t *info = link->transport_link; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event ev; if (connect(info->connect_sock, (struct sockaddr *)&link->dst_addr, sizeof(struct sockaddr_storage)) < 0) { if ((errno != EALREADY) && (errno != EINPROGRESS) && (errno != EISCONN)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to connect SCTP socket %d: %s", info->connect_sock, strerror(savederrno)); goto exit_error; } } if (!info->on_connected_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLOUT; ev.data.fd = info->connect_sock; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, info->connect_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add send/recv to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_connected_epoll = 1; } exit_error: errno = savederrno; return err; } static int _create_connect_socket(knet_handle_t knet_h, struct knet_link *link) { int err = 0, savederrno = 0; sctp_connect_link_info_t *info = link->transport_link; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event ev; int connect_sock; connect_sock = socket(link->dst_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP); if (connect_sock < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create send/recv socket: %s", strerror(savederrno)); goto exit_error; } if (_configure_sctp_socket(knet_h, connect_sock, &link->dst_addr, "SCTP connect") < 0) { savederrno = errno; err = -1; goto exit_error; } if (_set_fd_tracker(knet_h, connect_sock, KNET_TRANSPORT_SCTP, SCTP_CONNECT_LINK_INFO, info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } info->connect_sock = connect_sock; info->close_sock = 0; if (_reconnect_socket(knet_h, link) < 0) { savederrno = errno; err = -1; goto exit_error; } exit_error: if (err) { if (info->on_connected_epoll) { epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, connect_sock, &ev); } if (connect_sock >= 0) { close(connect_sock); } } errno = savederrno; return err; } /* * socket error management functions * * both called with global read lock. * * NOTE: we need to remove the fd from the epoll as soon as possible * even before we notify the respective thread to take care of it * because scheduling can make it so that this thread will overload * and the threads supposed to take care of the error will never * be able to take action. * we CANNOT handle FDs here diretly (close/reconnect/etc) due * to locking context. We need to delegate that to their respective * management threads within global write lock. * * this function is called from: * - RX thread with recv_err <= 0 directly on recvmmsg error * - transport_rx_is_data when msg_len == 0 (recv_err = 1) * - transport_rx_is_data on notification (recv_err = 2) * * basically this small abouse of recv_err is to detect notifications * generated by sockets created by listen(). */ static int sctp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno) { struct epoll_event ev; sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data; sctp_listen_link_info_t *listen_info = knet_h->knet_transport_fd_tracker[sockfd].data; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) { case SCTP_CONNECT_LINK_INFO: /* * all connect link have notifications enabled * and we accept only data from notification and * generic recvmmsg errors. * * Errors generated by msg_len 0 can be ignored because * they follow a notification (double notification) */ if (recv_err != 1) { connect_info->link->transport_connected = 0; if (connect_info->on_rx_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = sockfd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s", strerror(errno)); return -1; } connect_info->on_rx_epoll = 0; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received an error", sockfd); if (sendto(handle_info->connectsockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno)); } } break; case SCTP_LISTEN_LINK_INFO: if (listen_info->listen_sock != sockfd) { if (recv_err != 1) { if (listen_info->on_rx_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = sockfd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s", strerror(errno)); return -1; } listen_info->on_rx_epoll = 0; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying listen thread that sockfd %d received an error", sockfd); if (sendto(handle_info->listensockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify listen thread: %s", strerror(errno)); } } } else { /* * this means the listen() socket has generated * a notification. now what? :-) */ log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen() socket %d", sockfd); } break; default: break; } return 0; } /* * NOTE: sctp_transport_rx_is_data is called with global rdlock * delegate any FD error management to sctp_transport_rx_sock_error * and keep this code to parsing incoming data only */ static int sctp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct mmsghdr msg) { int i; struct iovec *iov = msg.msg_hdr.msg_iov; size_t iovlen = msg.msg_hdr.msg_iovlen; struct sctp_assoc_change *sac; union sctp_notification *snp; if (!(msg.msg_hdr.msg_flags & MSG_NOTIFICATION)) { if (msg.msg_len == 0) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "received 0 bytes len packet: %d", sockfd); /* * NOTE: with event notification enabled, we receive error twice: * 1) from the event notification * 2) followed by a 0 byte msg_len * * This is generally not a problem if not for causing extra * handling for the same issue. Should we drop notifications * and keep the code generic (handle all errors via msg_len = 0) * or keep the duplication as safety measure, or drop msg_len = 0 * handling (what about sockets without events enabled?) */ sctp_transport_rx_sock_error(knet_h, sockfd, 1, 0); return 1; } return 2; } if (!(msg.msg_hdr.msg_flags & MSG_EOR)) { return 1; } for (i=0; i< iovlen; i++) { snp = iov[i].iov_base; switch (snp->sn_header.sn_type) { case SCTP_ASSOC_CHANGE: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change"); sac = &snp->sn_assoc_change; if (sac->sac_state == SCTP_COMM_LOST) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change: comm_lost"); sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0); } break; case SCTP_SHUTDOWN_EVENT: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp shutdown event"); sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0); break; case SCTP_SEND_FAILED: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp send failed"); break; case SCTP_PEER_ADDR_CHANGE: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp peer addr change"); break; case SCTP_REMOTE_ERROR: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp remote error"); break; default: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] unknown sctp event type: %hu\n", snp->sn_header.sn_type); break; } } return 0; } /* * connect / outgoing socket management thread */ /* * _handle_connected_sctp* are called with a global write lock * from the connect_thread */ static void _handle_connected_sctp(knet_handle_t knet_h, int connect_sock) { int err; struct epoll_event ev; unsigned int status, len = sizeof(status); sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_connect_link_info_t *info = knet_h->knet_transport_fd_tracker[connect_sock].data; struct knet_link *link = info->link; err = getsockopt(connect_sock, SOL_SOCKET, SO_ERROR, &status, &len); if (err) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP getsockopt() on connecting socket %d failed: %s", connect_sock, strerror(errno)); return; } if (info->close_sock) { if (_close_connect_socket(knet_h, link) < 0) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close sock %d from _handle_connected_sctp: %s", connect_sock, strerror(errno)); return; } info->close_sock = 0; if (_create_connect_socket(knet_h, link) < 0) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to recreate connecting sock! %s", strerror(errno)); return; } } if (status) { log_info(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect on %d to %s port %s failed: %s", connect_sock, link->status.dst_ipaddr, link->status.dst_port, strerror(status)); /* * No need to create a new socket if connect failed, * just retry connect */ _reconnect_socket(knet_h, info->link); return; } /* * Connected - Remove us from the connect epoll */ memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLOUT; ev.data.fd = connect_sock; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, connect_sock, &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket %d from epoll pool: %s", connect_sock, strerror(errno)); } info->on_connected_epoll = 0; link->transport_connected = 1; link->outsock = info->connect_sock; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = connect_sock; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, connect_sock, &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connected socket to epoll pool: %s", strerror(errno)); } info->on_rx_epoll = 1; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP handler fd %d now connected to %s port %s", connect_sock, link->status.dst_ipaddr, link->status.dst_port); } static void _handle_connected_sctp_errors(knet_handle_t knet_h) { int sockfd = -1; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_connect_link_info_t *info; if (recv(handle_info->connectsockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on connectsockfd"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for connected socket fd error"); return; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing connected error on socket: %d", sockfd); info = knet_h->knet_transport_fd_tracker[sockfd].data; info->close_sock = 1; info->link->transport_connected = 0; _reconnect_socket(knet_h, info->link); } static void *_sctp_connect_thread(void *data) { int savederrno; int i, nev; knet_handle_t knet_h = (knet_handle_t) data; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(handle_info->connect_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1); if (nev < 0) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect handler EPOLL ERROR: %s", strerror(errno)); continue; } /* * Sort out which FD has a connection */ savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s", strerror(savederrno)); continue; } /* * minor optimization: deduplicate events * * in some cases we can receive multiple notifcations * of the same FD having issues or need handling. * It's enough to process it once even tho it's safe * to handle them multiple times. */ for (i = 0; i < nev; i++) { if (events[i].data.fd == handle_info->connectsockfd[0]) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for connected socket"); _handle_connected_sctp_errors(knet_h); } else { if (_is_valid_fd(knet_h, events[i].data.fd) == 1) { _handle_connected_sctp(knet_h, events[i].data.fd); } else { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for dead fd %d\n", events[i].data.fd); } } } pthread_rwlock_unlock(&knet_h->global_rwlock); /* * this thread can generate events for itself. * we need to sleep in between loops to allow other threads * to be scheduled */ usleep(KNET_SCTP_SLEEP_TIME); } return NULL; } /* * listen/incoming connections management thread */ /* * Listener received a new connection * called with a write lock from main thread */ static void _handle_incoming_sctp(knet_handle_t knet_h, int listen_sock) { int err = 0, savederrno = 0; int new_fd; int i = -1; sctp_listen_link_info_t *info = knet_h->knet_transport_fd_tracker[listen_sock].data; struct epoll_event ev; struct sockaddr_storage ss; socklen_t sock_len = sizeof(ss); char *print_str[2]; new_fd = accept(listen_sock, (struct sockaddr *)&ss, &sock_len); if (new_fd < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accept error: %s", strerror(errno)); goto exit_error; } if (_transport_addrtostr((struct sockaddr *)&ss, sizeof(ss), print_str) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to gather socket info"); goto exit_error; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: received connection from: %s port: %s", print_str[0], print_str[1]); /* * Keep a track of all accepted FDs */ for (i=0; iaccepted_socks[i] == -1) { info->accepted_socks[i] = new_fd; break; } } if (i == MAX_ACCEPTED_SOCKS) { errno = EBUSY; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: too many connections!"); goto exit_error; } if (_configure_common_socket(knet_h, new_fd, "SCTP incoming") < 0) { savederrno = errno; err = -1; goto exit_error; } if (_enable_sctp_notifications(knet_h, new_fd, "Incoming connection") < 0) { savederrno = errno; err = -1; goto exit_error; } if (_set_fd_tracker(knet_h, new_fd, KNET_TRANSPORT_SCTP, SCTP_LISTEN_LINK_INFO, info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(errno)); goto exit_error; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = new_fd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, new_fd, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to add accepted socket %d to epoll pool: %s", new_fd, strerror(errno)); goto exit_error; } info->on_rx_epoll = 1; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accepted new fd %d for %s (listen fd: %d). index: %d", new_fd, print_str[0], info->listen_sock, i); exit_error: _transport_addrtostr_free(print_str); if (err) { if ((i >= 0) || (i < MAX_ACCEPTED_SOCKS)) { info->accepted_socks[i] = -1; } _set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL); close(new_fd); } errno = savederrno; return; } /* * Listen thread received a notification of a bad socket that needs closing * called with a write lock from main thread */ static void _handle_listen_sctp_errors(knet_handle_t knet_h) { int sockfd = -1; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_listen_link_info_t *info; int i; if (recv(handle_info->listensockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on listensockfd"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen socket fd error"); return; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing listen error on socket: %d", sockfd); info = knet_h->knet_transport_fd_tracker[sockfd].data; for (i=0; iaccepted_socks[i]) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Closing accepted socket %d", sockfd); _set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL); info->accepted_socks[i] = -1; close(sockfd); } } } static void *_sctp_listen_thread(void *data) { int savederrno; int i, nev; knet_handle_t knet_h = (knet_handle_t) data; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(handle_info->listen_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1); if (nev < 0) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listen handler EPOLL ERROR: %s", strerror(errno)); continue; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s", strerror(savederrno)); continue; } /* * Sort out which FD has an incoming connection */ for (i = 0; i < nev; i++) { if (events[i].data.fd == handle_info->listensockfd[0]) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for listener/accepted socket"); _handle_listen_sctp_errors(knet_h); } else { if (_is_valid_fd(knet_h, events[i].data.fd) == 1) { _handle_incoming_sctp(knet_h, events[i].data.fd); } else { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received listen notification from invalid socket"); } } } pthread_rwlock_unlock(&knet_h->global_rwlock); } return NULL; } /* * sctp_link_listener_start/stop are called in global write lock * context from set_config and clear_config. */ static sctp_listen_link_info_t *sctp_link_listener_start(knet_handle_t knet_h, struct knet_link *link) { int err = 0, savederrno = 0; int listen_sock = -1; struct epoll_event ev; sctp_listen_link_info_t *info = NULL; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; /* * Only allocate a new listener if src address is different */ knet_list_for_each_entry(info, &handle_info->listen_links_list, list) { if (memcmp(&info->src_address, &link->src_addr, sizeof(struct sockaddr_storage)) == 0) { return info; } } info = malloc(sizeof(sctp_listen_link_info_t)); if (!info) { err = -1; goto exit_error; } memset(info, 0, sizeof(sctp_listen_link_info_t)); memset(info->accepted_socks, -1, sizeof(info->accepted_socks)); memcpy(&info->src_address, &link->src_addr, sizeof(struct sockaddr_storage)); listen_sock = socket(link->src_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP); if (listen_sock < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create listener socket: %s", strerror(savederrno)); goto exit_error; } if (_configure_sctp_socket(knet_h, listen_sock, &link->src_addr, "SCTP listener") < 0) { savederrno = errno; err = -1; goto exit_error; } if (bind(listen_sock, (struct sockaddr *)&link->src_addr, sizeof(struct sockaddr_storage)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind listener socket: %s", strerror(savederrno)); goto exit_error; } if (listen(listen_sock, 5) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to listen on listener socket: %s", strerror(savederrno)); goto exit_error; } if (_set_fd_tracker(knet_h, listen_sock, KNET_TRANSPORT_SCTP, SCTP_LISTEN_LINK_INFO, info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = listen_sock; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, listen_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listener to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_listener_epoll = 1; info->listen_sock = listen_sock; knet_list_add(&info->list, &handle_info->listen_links_list); log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Listening on fd %d for %s:%s", listen_sock, link->status.src_ipaddr, link->status.src_port); exit_error: if (err) { if (info->on_listener_epoll) { epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, listen_sock, &ev); } if (listen_sock >= 0) { close(listen_sock); } if (info) { free(info); info = NULL; } } errno = savederrno; return info; } static int sctp_link_listener_stop(knet_handle_t knet_h, struct knet_link *link) { int err = 0, savederrno = 0; int found = 0, i; struct knet_host *host; int link_idx; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_connect_link_info_t *this_link_info = link->transport_link; sctp_listen_link_info_t *info = this_link_info->listener; sctp_connect_link_info_t *link_info; struct epoll_event ev; for (host = knet_h->host_head; host != NULL; host = host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (&host->link[link_idx] == link) continue; link_info = host->link[link_idx].transport_link; if ((link_info) && (link_info->listener == info) && (host->link[link_idx].status.enabled == 1)) { found = 1; break; } } } if (found) { this_link_info->listener = NULL; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listener socket %d still in use", info->listen_sock); savederrno = EBUSY; err = -1; goto exit_error; } if (info->on_listener_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->listen_sock; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, info->listen_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_listener_epoll = 0; } if (_set_fd_tracker(knet_h, info->listen_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } close(info->listen_sock); for (i=0; i< MAX_ACCEPTED_SOCKS; i++) { if (info->accepted_socks[i] > -1) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->accepted_socks[i]; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->accepted_socks[i], &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s", strerror(errno)); } info->on_rx_epoll = 0; close(info->accepted_socks[i]); if (_set_fd_tracker(knet_h, info->accepted_socks[i], KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } info->accepted_socks[i] = -1; } } knet_list_del(&info->list); free(info); this_link_info->listener = NULL; exit_error: errno = savederrno; return err; } /* * Links config/clear. Both called with global wrlock from link_set_config/clear_config */ static int sctp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *link) { int savederrno = 0, err = 0; sctp_connect_link_info_t *info; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; info = malloc(sizeof(sctp_connect_link_info_t)); if (!info) { goto exit_error; } memset(info, 0, sizeof(sctp_connect_link_info_t)); link->transport_link = info; info->link = link; memcpy(&info->dst_address, &link->dst_addr, sizeof(struct sockaddr_storage)); info->on_connected_epoll = 0; info->connect_sock = -1; info->listener = sctp_link_listener_start(knet_h, link); if (!info->listener) { savederrno = errno; err = -1; goto exit_error; } if (_create_connect_socket(knet_h, link) < 0) { savederrno = errno; err = -1; goto exit_error; } knet_list_add(&info->list, &handle_info->connect_links_list); link->outsock = info->connect_sock; exit_error: if (err) { if (info) { if (info->connect_sock) { close(info->connect_sock); } if (info->listener) { sctp_link_listener_stop(knet_h, link); } link->transport_link = NULL; free(info); } } errno = savederrno; return err; } /* * called with global wrlock * FIX exit path error handling */ static int sctp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *link) { int err = 0, savederrno = 0; sctp_connect_link_info_t *info; struct epoll_event ev; if (!link) { errno = EINVAL; return -1; } info = link->transport_link; if (!info) { errno = EINVAL; return -1; } if ((sctp_link_listener_stop(knet_h, link) <0) && (errno != EBUSY)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener trasport: %s", strerror(savederrno)); goto exit_error; } if (info->on_rx_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->connect_sock; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_rx_epoll = 0; } if (_close_connect_socket(knet_h, link) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close connected socket: %s", strerror(savederrno)); goto exit_error; } knet_list_del(&info->list); free(info); link->transport_link = NULL; exit_error: errno = savederrno; return err; } /* * transport_free and transport_init are * called only from knet_handle_new and knet_handle_free. * all resources (hosts/links) should have been already freed at this point * and they are called in a write locked context, hence they * don't need their own locking. */ static int sctp_transport_free(knet_handle_t knet_h) { sctp_handle_info_t *handle_info; void *thread_status; struct epoll_event ev; if (!knet_h->transports[KNET_TRANSPORT_SCTP]) { errno = EINVAL; return -1; } handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; /* * keep it here while we debug list usage and such */ if (!knet_list_empty(&handle_info->listen_links_list)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. listen links list is not empty"); } if (!knet_list_empty(&handle_info->connect_links_list)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. connect links list is not empty"); } if (handle_info->listen_thread) { pthread_cancel(handle_info->listen_thread); pthread_join(handle_info->listen_thread, &thread_status); } if (handle_info->connect_thread) { pthread_cancel(handle_info->connect_thread); pthread_join(handle_info->connect_thread, &thread_status); } if (handle_info->listensockfd[0] >= 0) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->listensockfd[0]; epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, handle_info->listensockfd[0], &ev); } if (handle_info->connectsockfd[0] >= 0) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->connectsockfd[0]; epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, handle_info->connectsockfd[0], &ev); } _close_socketpair(knet_h, handle_info->connectsockfd); _close_socketpair(knet_h, handle_info->listensockfd); if (handle_info->listen_epollfd >= 0) { close(handle_info->listen_epollfd); } if (handle_info->connect_epollfd >= 0) { close(handle_info->connect_epollfd); } free(handle_info); knet_h->transports[KNET_TRANSPORT_SCTP] = NULL; return 0; } static int sctp_transport_init(knet_handle_t knet_h) { int err = 0, savederrno = 0; sctp_handle_info_t *handle_info; struct epoll_event ev; if (knet_h->transports[KNET_TRANSPORT_SCTP]) { errno = EEXIST; return -1; } handle_info = malloc(sizeof(sctp_handle_info_t)); if (!handle_info) { return -1; } memset(handle_info, 0,sizeof(sctp_handle_info_t)); knet_h->transports[KNET_TRANSPORT_SCTP] = handle_info; knet_list_init(&handle_info->listen_links_list); knet_list_init(&handle_info->connect_links_list); handle_info->listen_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (handle_info->listen_epollfd < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll listen fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(handle_info->listen_epollfd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on listen_epollfd: %s", strerror(savederrno)); goto exit_fail; } handle_info->connect_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (handle_info->connect_epollfd < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll connect fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(handle_info->connect_epollfd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on connect_epollfd: %s", strerror(savederrno)); goto exit_fail; } if (_init_socketpair(knet_h, handle_info->connectsockfd) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init connect socketpair: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->connectsockfd[0]; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, handle_info->connectsockfd[0], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connectsockfd[0] to connect epoll pool: %s", strerror(savederrno)); goto exit_fail; } if (_init_socketpair(knet_h, handle_info->listensockfd) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init listen socketpair: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->listensockfd[0]; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, handle_info->listensockfd[0], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listensockfd[0] to listen epoll pool: %s", strerror(savederrno)); goto exit_fail; } /* * Start connect & listener threads */ savederrno = pthread_create(&handle_info->listen_thread, 0, _sctp_listen_thread, (void *) knet_h); if (savederrno) { err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp listen thread: %s", strerror(savederrno)); goto exit_fail; } savederrno = pthread_create(&handle_info->connect_thread, 0, _sctp_connect_thread, (void *) knet_h); if (savederrno) { err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp connect thread: %s", strerror(savederrno)); goto exit_fail; } exit_fail: if (err < 0) { sctp_transport_free(knet_h); } errno = savederrno; return err; } static knet_transport_ops_t sctp_transport_ops = { .transport_name = "SCTP", .transport_mtu_overhead = KNET_PMTUD_SCTP_OVERHEAD, .transport_init = sctp_transport_init, .transport_free = sctp_transport_free, .transport_link_set_config = sctp_transport_link_set_config, .transport_link_clear_config = sctp_transport_link_clear_config, .transport_rx_sock_error = sctp_transport_rx_sock_error, .transport_rx_is_data = sctp_transport_rx_is_data, }; knet_transport_ops_t *get_sctp_transport() { return &sctp_transport_ops; } #else // HAVE_NETINET_SCTP_H knet_transport_ops_t *get_sctp_transport() { return NULL; } #endif