diff --git a/libknet/libknet.h b/libknet/libknet.h index ee9e6d93..a7ebb7c6 100644 --- a/libknet/libknet.h +++ b/libknet/libknet.h @@ -1,1344 +1,1344 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #ifndef __LIBKNET_H__ #define __LIBKNET_H__ #include #include /* * libknet limits */ /* * Maximum number of hosts */ #define KNET_MAX_HOST 65536 /* * Maximum number of links between 2 hosts */ #define KNET_MAX_LINK 8 /* * Maximum packet size that should be written to datafd * see knet_handle_new for details */ #define KNET_MAX_PACKET_SIZE 65536 /* * Buffers used for pretty logging * host is used to store both ip addresses and hostnames */ #define KNET_MAX_HOST_LEN 64 #define KNET_MAX_PORT_LEN 6 /* * Some notifications can be generated either on TX or RX */ #define KNET_NOTIFY_TX 0 #define KNET_NOTIFY_RX 1 typedef struct knet_handle *knet_handle_t; /* * Handle structs/API calls */ /* * knet_handle_new * * host_id - Each host in a knet is identified with a unique * ID. when creating a new handle local host_id * must be specified (0 to UINT16T_MAX are all valid). * It is the user's responsibility to check that the value * is unique, or bad things might happen. * * log_fd - Write file descriptor. If set to a value > 0, it will be used * to write log packets (see below) from libknet to the application. * Setting to 0 will disable logging from libknet. * It is possible to enable logging at any given time (see logging API * below). * Make sure to either read from this filedescriptor properly and/or * mark it O_NONBLOCK, otherwise if the fd becomes full, libknet could * block. * * default_log_level - * If logfd is specified, it will initialize all subsystems to log * at default_log_level value. (see logging API below) * * on success, a new knet_handle_t is returned. * on failure, NULL is returned and errno is set. */ knet_handle_t knet_handle_new(uint16_t host_id, int log_fd, uint8_t default_log_level); /* * knet_handle_free * * knet_h - pointer to knet_handle_t * * Destroy a knet handle, free all resources * * knet_handle_free returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_free(knet_handle_t knet_h); /* * knet_handle_enable_sock_notify * * knet_h - pointer to knet_handle_t * * sock_notify_fn_private_data * void pointer to data that can be used to identify * the callback. * * sock_notify_fn * A callback function that is invoked every time * a socket in the datafd pool will report an error (-1) * or an end of read (0) (see socket.7). * This function MUST NEVER block or add substantial delays. * The callback is invoked in an internal unlocked area * to allow calls to knet_handle_add_datafd/knet_handle_remove_datafd * to swap/replace the bad fd. * if both err and errno are 0, it means that the socket * has received a 0 byte packet (EOF?). * The callback function must either remove the fd from knet * (by calling knet_handle_remove_fd()) or dup a new fd in its place. * Failure to do this can cause problems. * * knet_handle_enable_sock_notify returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_enable_sock_notify(knet_handle_t knet_h, void *sock_notify_fn_private_data, void (*sock_notify_fn) ( void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno)); /* sorry! can't call it errno ;) */ /* * knet_handle_add_datafd * * IMPORTANT: In order to add datafd to knet, knet_handle_enable_sock_notify * _MUST_ be set and be able to handle both errors (-1) and * 0 bytes read / write from the provided datafd. * On read error (< 0) from datafd, the socket is automatically * removed from polling to avoid spinning on dead sockets. * It is safe to call knet_handle_remove_datafd even on sockets * that have been removed. * * knet_h - pointer to knet_handle_t * * *datafd - read/write file descriptor. * knet will read data here to send to the other hosts * and will write data received from the network. * Each data packet can be of max size KNET_MAX_PACKET_SIZE! * Applications using knet_send/knet_recv will receive a * proper error if the packet size is not within boundaries. * Applications using their own functions to write to the * datafd should NOT write more than KNET_MAX_PACKET_SIZE. * * Please refer to handle.c on how to set up a socketpair. * * datafd can be 0, and knet_handle_add_datafd will create a properly * populated socket pair the same way as ping_test, or a value * higher than 0. A negative number will return an error. * On exit knet_handle_free will take care to cleanup the * socketpair only if they have been created by knet_handle_add_datafd. * * It is possible to pass either sockets or normal fds. * User provided datafd will be marked as non-blocking and close-on-exit. * * *channel - This value has the same effect of VLAN tagging. * A negative value will auto-allocate a channel. * Setting a value between 0 and 31 will try to allocate that * specific channel (unless already in use). * * It is possible to add up to 32 datafds but be aware that each * one of them must have a receiving end on the other host. * * Example: * hostA channel 0 will be delivered to datafd on hostB channel 0 * hostA channel 1 to hostB channel 1. * * Each channel must have a unique file descriptor. * * If your application could have 2 channels on one host and one * channel on another host, then you can use dst_host_filter * to manipulate channel values on TX and RX. * * knet_handle_add_datafd returns: * * 0 on success * *datafd will be populated with a socket if the original value was 0 * or if a specific fd was set, the value is untouched. * *channel will be populated with a channel number if the original value * was negative or the value is untouched if a specific channel * was requested. * * -1 on error and errno is set. * *datafd and *channel are untouched or empty. */ #define KNET_DATAFD_MAX 32 int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel); /* * knet_handle_remove_datafd * * knet_h - pointer to knet_handle_t * * datafd - file descriptor to remove. * NOTE that if the socket/fd was created by knet_handle_add_datafd, * the socket will be closed by libknet. * * knet_handle_remove_datafd returns: * * 0 on success * * -1 on error and errno is set. */ int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd); /* * knet_handle_enable_sock_notify * * knet_h - pointer to knet_handle_t * * sock_notify_fn_private_data * void pointer to data that can be used to identify * the callback. * * sock_notify_fn * A callback function that is invoked every time * a socket in the datafd pool will report an error (-1) * or an end of read (0) (see socket.7). * This function MUST NEVER block or add substantial delays. * The callback is invoked in an internal unlocked area * to allow calls to knet_handle_add_datafd/knet_handle_remove_datafd * to swap/replace the bad fd. * if both err and errno are 0, it means that the socket * has received a 0 byte packet (EOF?). * The callback function must either remove the fd from knet * (by calling knet_handle_remove_fd()) or dup a new fd in its place. * Failure to do this can cause problems. * * knet_handle_enable_sock_notify returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel); /* * knet_handle_get_datafd * * knet_h - pointer to knet_handle_t * * channel - get the datafd associated to this channel * * *datafd - will contain the result * * knet_handle_get_datafd returns: * * 0 on success * and *datafd will contain the results * * -1 on error and errno is set. * and *datafd content is meaningless */ int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd); /* * knet_recv * * knet_h - pointer to knet_handle_t * * buff - pointer to buffer to store the received data * * buff_len - buffer lenght * * knet_recv is a commodity function to wrap iovec operations * around a socket. It returns a call to readv(2). */ ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel); /* * knet_send * * knet_h - pointer to knet_handle_t * * buff - pointer to the buffer of data to send * * buff_len - length of data to send * * knet_send is a commodity function to wrap iovec operations * around a socket. It returns a call to writev(2). */ ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel); /* * knet_send_sync * * knet_h - pointer to knet_handle_t * * buff - pointer to the buffer of data to send * * buff_len - length of data to send * * channel - data channel to use (see knet_handle_add_datafd) * * All knet RX/TX operations are async for performance reasons. * There are applications that might need a sync version of data * transmission and receive errors in case of failure to deliver * to another host. * knet_send_sync bypasses the whole TX async layer and delivers * data directly to the link layer, and returns errors accordingly. * knet_send_sync allows to send only one packet to one host at * a time. It does NOT support multiple destinations or multicast * packets. Decision is still based on dst_host_filter_fn. * * knet_send_sync returns 0 on success and -1 on error. * * In addition to normal sendmmsg errors, knet_send_sync can fail * due to: * * ECANCELED - data forward is disabled * EFAULT - dst_host_filter fatal error * EINVAL - dst_host_filter did not provide * dst_host_ids_entries on unicast pckts * E2BIG - dst_host_filter did return more than one * dst_host_ids_entries on unicast pckts * ENOMSG - received unknown message type * EHOSTDOWN - unicast pckt cannot be delivered because * dest host is not connected yet * ECHILD - crypto failed * EAGAIN - sendmmsg was unable to send all messages and * there was no progress during retry */ int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel); /* * knet_handle_enable_filter * * knet_h - pointer to knet_handle_t * * dst_host_filter_fn_private_data * void pointer to data that can be used to identify * the callback. * * dst_host_filter_fn - * is a callback function that is invoked every time * a packet hits datafd (see knet_handle_new). * the function allows users to tell libknet where the * packet has to be delivered. * * const unsigned char *outdata - is a pointer to the * current packet * ssize_t outdata_len - lenght of the above data * uint8_t tx_rx - filter is called on tx or rx * (see defines below) * uint16_t this_host_id - host_id processing the packet * uint16_t src_host_id - host_id that generated the * packet * uint16_t *dst_host_ids - array of KNET_MAX_HOST uint16_t * where to store the destinations * size_t *dst_host_ids_entries - number of hosts to send the message * * dst_host_filter_fn should return * -1 on error, packet is discarded. * 0 packet is unicast and should be sent to dst_host_ids and there are * dst_host_ids_entries in the buffer. * 1 packet is broadcast/multicast and is sent all hosts. * contents of dst_host_ids and dst_host_ids_entries are ignored. * (see also kronosnetd/etherfilter.* for an example that filters based * on ether protocol) * * knet_handle_enable_filter returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_enable_filter(knet_handle_t knet_h, void *dst_host_filter_fn_private_data, int (*dst_host_filter_fn) ( void *private_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, uint16_t this_host_id, uint16_t src_host_id, int8_t *channel, uint16_t *dst_host_ids, size_t *dst_host_ids_entries)); /* * knet_handle_setfwd * * knet_h - pointer to knet_handle_t * * enable - set to 1 to allow data forwarding, 0 to disable data forwarding. * * knet_handle_setfwd returns: * * 0 on success * -1 on error and errno is set. * * By default data forwarding is off and no traffic will pass through knet until * it is set on. */ int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled); /* * knet_handle_pmtud_setfreq * * knet_h - pointer to knet_handle_t * * interval - define the interval in seconds between PMTUd scans * range from 1 to 86400 (24h) * * knet_handle_pmtud_setfreq returns: * * 0 on success * -1 on error and errno is set. * * default interval is 60. */ #define KNET_PMTUD_DEFAULT_INTERVAL 60 int knet_handle_pmtud_setfreq(knet_handle_t knet_h, unsigned int interval); /* * knet_handle_pmtud_getfreq * * knet_h - pointer to knet_handle_t * * interval - pointer where to store the current interval value * * knet_handle_pmtud_setfreq returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval); /* * knet_handle_enable_pmtud_notify * * knet_h - pointer to knet_handle_t * * pmtud_notify_fn_private_data * void pointer to data that can be used to identify * the callback. * * pmtud_notify_fn * is a callback function that is invoked every time * a path MTU size change is detected. * The function allows libknet to notify the user * of data MTU, that's the max value that can be send * onwire without fragmentation. The data MTU will always * be lower than real link MTU because it accounts for * protocol overhead, knet packet header and (if configured) * crypto overhead, * This function MUST NEVER block or add substantial delays. * * knet_handle_enable_pmtud_notify returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_enable_pmtud_notify(knet_handle_t knet_h, void *pmtud_notify_fn_private_data, void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu)); /* * knet_handle_pmtud_get * * knet_h - pointer to knet_handle_t * * data_mtu - pointer where to store data_mtu (see above) * * knet_handle_pmtud_get returns: * * 0 on success * -1 on error and errno is set. */ int knet_handle_pmtud_get(knet_handle_t knet_h, unsigned int *data_mtu); /* * knet_handle_crypto * * knet_h - pointer to knet_handle_t * * knet_handle_crypto_cfg - * pointer to a knet_handle_crypto_cfg structure * * crypto_model should contain the model name. * Currently only "nss" is supported. * Setting to "none" will disable crypto. * * crypto_cipher_type * should contain the cipher algo name. * It can be set to "none" to disable * encryption. * Currently supported by "nss" model: * "3des", "aes128", "aes192" and "aes256". * * crypto_hash_type * should contain the hashing algo name. * It can be set to "none" to disable * hashing. * Currently supported by "nss" model: * "md5", "sha1", "sha256", "sha384" and "sha512". * * private_key will contain the private shared key. * It has to be at least KNET_MIN_KEY_LEN long. * * private_key_len * length of the provided private_key. * * Implementation notes/current limitations: * - enabling crypto, will increase latency as packets have * to processed. * - enabling crypto might reduce the overall throughtput * due to crypto data overhead. * - re-keying is not implemented yet. * - private/public key encryption/hashing is not currently * planned. * - crypto key must be the same for all hosts in the same * knet instance. * - it is safe to call knet_handle_crypto multiple times at runtime. * The last config will be used. * IMPORTANT: a call to knet_handle_crypto can fail due to: * 1) failure to obtain locking * 2) errors to initializing the crypto level. * This can happen even in subsequent calls to knet_handle_crypto. * A failure in crypto init, might leave your traffic unencrypted! * It's best to stop data forwarding (see above), change crypto config, * start forward again. * * knet_handle_crypto returns: * * 0 on success * -1 on error and errno is set. * -2 on crypto subsystem initialization error. No errno is provided at the moment (yet). */ #define KNET_MIN_KEY_LEN 1024 #define KNET_MAX_KEY_LEN 4096 struct knet_handle_crypto_cfg { char crypto_model[16]; char crypto_cipher_type[16]; char crypto_hash_type[16]; unsigned char private_key[KNET_MAX_KEY_LEN]; unsigned int private_key_len; }; int knet_handle_crypto(knet_handle_t knet_h, struct knet_handle_crypto_cfg *knet_handle_crypto_cfg); /* * host structs/API calls */ /* * knet_host_add * * knet_h - pointer to knet_handle_t * * host_id - each host in a knet is identified with a unique ID * (see also knet_handle_new documentation above) * * knet_host_add returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_add(knet_handle_t knet_h, uint16_t host_id); /* * knet_host_remove * * knet_h - pointer to knet_handle_t * * host_id - each host in a knet is identified with a unique ID * (see also knet_handle_new documentation above) * * knet_host_remove returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_remove(knet_handle_t knet_h, uint16_t host_id); /* * knet_host_set_name * * knet_h - pointer to knet_handle_t * * host_id - see above * * name - this name will be used for pretty logging and eventually * search for hosts (see also get_name and get_id below). * Only up to KNET_MAX_HOST_LEN - 1 bytes will be accepted and * name has to be unique for each host. * * knet_host_set_name returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_set_name(knet_handle_t knet_h, uint16_t host_id, const char *name); /* * knet_host_get_name_by_host_id * * knet_h - pointer to knet_handle_t * * host_id - see above * * name - pointer to a preallocated buffer of at least size KNET_MAX_HOST_LEN * where the current host name will be stored * (as set by knet_host_set_name or default by knet_host_add) * * knet_host_get_name_by_host_id returns: * * 0 on success * -1 on error and errno is set (name is left untouched) */ int knet_host_get_name_by_host_id(knet_handle_t knet_h, uint16_t host_id, char *name); /* * knet_host_get_id_by_host_name * * knet_h - pointer to knet_handle_t * * name - name to lookup, max len KNET_MAX_HOST_LEN * * host_id - where to store the result * * knet_host_get_id_by_host_name returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_get_id_by_host_name(knet_handle_t knet_h, const char *name, uint16_t *host_id); /* * knet_host_get_host_list * * knet_h - pointer to knet_handle_t * * host_ids - array of at lest KNET_MAX_HOST size * * host_ids_entries - * number of entries writted in host_ids * * knet_host_get_host_list returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_get_host_list(knet_handle_t knet_h, uint16_t *host_ids, size_t *host_ids_entries); /* * define switching policies */ #define KNET_LINK_POLICY_PASSIVE 0 #define KNET_LINK_POLICY_ACTIVE 1 #define KNET_LINK_POLICY_RR 2 /* * knet_host_set_policy * * knet_h - pointer to knet_handle_t * * host_id - see above * * policy - there are currently 3 kind of simple switching policies * as defined above, based on link configuration. * KNET_LINK_POLICY_PASSIVE - the active link with the lowest * priority will be used. * if one or more active links share * the same priority, the one with * lowest link_id will be used. * * KNET_LINK_POLICY_ACTIVE - all active links will be used * simultaneously to send traffic. * link priority is ignored. * * KNET_LINK_POLICY_RR - round-robin policy, every packet * will be send on a different active * link. * * knet_host_set_policy returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_set_policy(knet_handle_t knet_h, uint16_t host_id, uint8_t policy); /* * knet_host_get_policy * * knet_h - pointer to knet_handle_t * * host_id - see above * * policy - will contain the current configured switching policy. * Default is passive when creating a new host. * * knet_host_get_policy returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_get_policy(knet_handle_t knet_h, uint16_t host_id, uint8_t *policy); /* * knet_host_enable_status_change_notify * * knet_h - pointer to knet_handle_t * * host_status_change_notify_fn_private_data * void pointer to data that can be used to identify * the callback. * * host_status_change_notify_fn * is a callback function that is invoked every time * there is a change in the host status. * host status is identified by: * - reachable, this host can send/receive data to/from host_id * - remote, 0 if the host_id is connected locally or 1 if * the there is one or more knet host(s) in between. * NOTE: re-switching is NOT currently implemented, * but this is ready for future and can avoid * an API/ABI breakage later on. * - external, 0 if the host_id is configured locally or 1 if * it has been added from remote nodes config. * NOTE: dynamic topology is NOT currently implemented, * but this is ready for future and can avoid * an API/ABI breakage later on. * This function MUST NEVER block or add substantial delays. * * knet_host_status_change_notify returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_enable_status_change_notify(knet_handle_t knet_h, void *host_status_change_notify_fn_private_data, void (*host_status_change_notify_fn) ( void *private_data, uint16_t host_id, uint8_t reachable, uint8_t remote, uint8_t external)); /* * define host status structure for quick lookup * struct is in flux as more stats will be added soon * * reachable host_id can be seen either directly connected * or via another host_id * * remote 0 = node is connected locally, 1 is visible via * via another host_id * * external 0 = node is configured/known locally, * 1 host_id has been received via another host_id */ struct knet_host_status { uint8_t reachable; uint8_t remote; uint8_t external; /* add host statistics */ }; /* * knet_host_status_get * * knet_h - pointer to knet_handle_t * * status - pointer to knet_host_status struct (see above) * * knet_handle_pmtud_get returns: * * 0 on success * -1 on error and errno is set. */ int knet_host_get_status(knet_handle_t knet_h, uint16_t host_id, struct knet_host_status *status); /* * link structs/API calls * * every host allocated/managed by knet_host_* has * KNET_MAX_LINK structures to define the network * paths that connect 2 hosts. * * Each link is identified by a link_id that has a * values between 0 and KNET_MAX_LINK - 1. * * KNOWN LIMITATIONS: * * - let's assume the scenario where two hosts are connected * with any number of links. link_id must match on both sides. * If host_id 0 link_id 0 is configured to connect IP1 to IP2 and * host_id 0 link_id 1 is configured to connect IP3 to IP4, * host_id 1 link_id 0 _must_ connect IP2 to IP1 and likewise * host_id 1 link_id 1 _must_ connect IP4 to IP3. * We might be able to lift this restriction in future, by using * other data to determine src/dst link_id, but for now, deal with it. * * - */ #define KNET_TRANSPORT_UDP 0 #define KNET_TRANSPORT_SCTP 1 #define KNET_MAX_TRANSPORTS 2 /* * knet_link_set_config * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * transport - one of the above KNET_TRANSPORT_xxx constants * * src_addr - sockaddr_storage that can be either IPv4 or IPv6 * * dst_addr - sockaddr_storage that can be either IPv4 or IPv6 * this can be null if we don't know the incoming * IP address/port and the link will remain quiet * till the node on the other end will initiate a * connection * * knet_link_set_config returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_set_config(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, uint8_t transport, struct sockaddr_storage *src_addr, struct sockaddr_storage *dst_addr); /* * knet_link_get_config * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * transport - see above * * src_addr - sockaddr_storage that can be either IPv4 or IPv6 * * dst_addr - sockaddr_storage that can be either IPv4 or IPv6 * * dynamic - 0 if dst_addr is static or 1 if dst_addr is dynamic. * In case of 1, dst_addr can be NULL and it will be left * untouched. * * knet_link_get_config returns: * * 0 on success. * -1 on error and errno is set. */ int knet_link_get_config(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, uint8_t *transport, struct sockaddr_storage *src_addr, struct sockaddr_storage *dst_addr, uint8_t *dynamic); /* * knet_link_set_enable * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * enabled - 0 disable the link, 1 enable the link * * knet_link_set_enable returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_set_enable(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, unsigned int enabled); /* * knet_link_get_enable * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * enabled - 0 disable the link, 1 enable the link * * knet_link_get_enable returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_get_enable(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, unsigned int *enabled); /* * knet_link_set_ping_timers * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * interval - specify the ping interval * * timeout - if no pong is received within this time, * the link is declared dead * * precision - how many values of latency are used to calculate * the average link latency (see also get_status below) * * knet_link_set_ping_timers returns: * * 0 on success * -1 on error and errno is set. */ #define KNET_LINK_DEFAULT_PING_INTERVAL 1000 /* 1 second */ #define KNET_LINK_DEFAULT_PING_TIMEOUT 2000 /* 2 seconds */ #define KNET_LINK_DEFAULT_PING_PRECISION 2048 /* samples */ int knet_link_set_ping_timers(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, time_t interval, time_t timeout, unsigned int precision); /* * knet_link_get_ping_timers * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * interval - ping intervall * * timeout - if no pong is received within this time, * the link is declared dead * * precision - how many values of latency are used to calculate * the average link latency (see also get_status below) * * knet_link_get_ping_timers returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_get_ping_timers(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, time_t *interval, time_t *timeout, unsigned int *precision); /* * knet_link_set_pong_count * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * pong_count - how many valid ping/pongs before a link is marked UP. * default: 5, value should be > 0 * * knet_link_set_pong_count returns: * * 0 on success * -1 on error and errno is set. */ #define KNET_LINK_DEFAULT_PONG_COUNT 5 int knet_link_set_pong_count(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, uint8_t pong_count); /* * knet_link_get_pong_count * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * pong_count - see above * * knet_link_get_pong_count returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_get_pong_count(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, uint8_t *pong_count); /* * knet_link_set_priority * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * priority - specify the switching priority for this link * see also knet_host_set_policy * * knet_link_set_priority returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_set_priority(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, uint8_t priority); /* * knet_link_get_priority * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * priority - gather the switching priority for this link * see also knet_host_set_policy * * knet_link_get_priority returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_get_priority(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, uint8_t *priority); /* * knet_link_get_link_list * * knet_h - pointer to knet_handle_t * * link_ids - array of at lest KNET_MAX_LINK size * with the list of configured links for a certain host. * * link_ids_entries - * number of entries contained in link_ids * * knet_link_get_link_list returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_get_link_list(knet_handle_t knet_h, uint16_t host_id, uint8_t *link_ids, size_t *link_ids_entries); /* * define link status structure for quick lookup * struct is in flux as more stats will be added soon * * src/dst_{ipaddr,port} strings are filled by * getnameinfo(3) when configuring the link. * if the link is dynamic (see knet_link_set_config) * dst_ipaddr/port will contain ipaddr/port of the currently * connected peer or "Unknown" if it was not possible * to determine the ipaddr/port at runtime. * * enabled see also knet_link_set/get_enable. * * connected the link is connected to a peer and ping/pong traffic * is flowing. * * dynconnected the link has dynamic ip on the other end, and * we can see the other host is sending pings to us. * * latency average latency of this link * see also knet_link_set/get_timeout. * * pong_last if the link is down, this value tells us how long * ago this link was active. A value of 0 means that the link * has never been active. */ struct knet_link_status { char src_ipaddr[KNET_MAX_HOST_LEN]; char src_port[KNET_MAX_PORT_LEN]; char dst_ipaddr[KNET_MAX_HOST_LEN]; char dst_port[KNET_MAX_PORT_LEN]; unsigned int enabled:1; /* link is configured and admin enabled for traffic */ unsigned int connected:1; /* link is connected for data (local view) */ unsigned int dynconnected:1; /* link has been activated by remote dynip */ unsigned long long latency; /* average latency computed by fix/exp */ struct timespec pong_last; unsigned int mtu; /* current detected MTU on this link */ unsigned int proto_overhead; /* contains the size of the IP protocol, knet headers and * crypto headers (if configured). This value is filled in * ONLY after the first PMTUd run on that given link, * and can change if link configuration or crypto configuration * changes at runtime. * WARNING: in general mtu + proto_overhead might or might * not match the output of ifconfig mtu due to crypto * requirements to pad packets to some specific boundaries. */ /* add link statistics */ }; /* * knet_link_get_status * * knet_h - pointer to knet_handle_t * * host_id - see above * * link_id - see above * * status - pointer to knet_link_status struct (see above) * * knet_link_get_status returns: * * 0 on success * -1 on error and errno is set. */ int knet_link_get_status(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id, struct knet_link_status *status); /* * logging structs/API calls */ /* * libknet is composed of several subsystems. In order * to easily distinguish log messages coming from different * places, each subsystem has its own ID. * * 0-19 config/management * 20-39 internal threads * 40-59 transports * 60-69 crypto implementations */ #define KNET_SUB_COMMON 0 /* common.c */ #define KNET_SUB_HANDLE 1 /* handle.c alloc/dealloc config changes */ #define KNET_SUB_HOST 2 /* host add/del/modify */ #define KNET_SUB_LISTENER 3 /* listeners add/del/modify... */ #define KNET_SUB_LINK 4 /* link add/del/modify */ #define KNET_SUB_TRANSPORT 5 /* Transport common */ #define KNET_SUB_CRYPTO 6 /* crypto.c config generic layer */ #define KNET_SUB_FILTER 19 /* allocated for users to log from dst_filter */ #define KNET_SUB_DSTCACHE 20 /* switching thread (destination cache handling) */ #define KNET_SUB_HEARTBEAT 21 /* heartbeat thread */ #define KNET_SUB_PMTUD 22 /* Path MTU Discovery thread */ #define KNET_SUB_TX 23 /* send to link thread */ -#define KNET_SUB_LINK_T 24 /* recv from link thread */ +#define KNET_SUB_RX 24 /* recv from link thread */ #define KNET_SUB_UDP_LINK_T 40 /* UDP Transport */ #define KNET_SUB_SCTP_LINK_T 41 /* SCTP Transport */ #define KNET_SUB_NSSCRYPTO 60 /* nsscrypto.c */ #define KNET_SUB_UNKNOWN 254 #define KNET_MAX_SUBSYSTEMS KNET_SUB_UNKNOWN + 1 /* * Convert between subsystem IDs and names */ /* * knet_log_get_subsystem_name * * return internal name of the subsystem or "common" */ const char *knet_log_get_subsystem_name(uint8_t subsystem); /* * knet_log_get_subsystem_id * * return internal ID of the subsystem or KNET_SUB_COMMON */ uint8_t knet_log_get_subsystem_id(const char *name); /* * 4 log levels are enough for everybody */ #define KNET_LOG_ERR 0 /* unrecoverable errors/conditions */ #define KNET_LOG_WARN 1 /* recoverable errors/conditions */ #define KNET_LOG_INFO 2 /* info, link up/down, config changes.. */ #define KNET_LOG_DEBUG 3 /* * Convert between log level values and names */ /* * knet_log_get_loglevel_name * * return internal name of the log level or "ERROR" for unknown values */ const char *knet_log_get_loglevel_name(uint8_t level); /* * knet_log_get_loglevel_id * * return internal log level ID or KNET_LOG_ERR for invalid names */ uint8_t knet_log_get_loglevel_id(const char *name); /* * every log message is composed by a text message (including a trailing \n) * and message level/subsystem IDs. * In order to make debugging easier it is possible to send those packets * straight to stdout/stderr (see ping_test.c stdout option). */ #define KNET_MAX_LOG_MSG_SIZE 256 struct knet_log_msg { char msg[KNET_MAX_LOG_MSG_SIZE - (sizeof(uint8_t)*2)]; uint8_t subsystem; /* KNET_SUB_* */ uint8_t msglevel; /* KNET_LOG_* */ }; /* * knet_log_set_log_level * * knet_h - same as above * * subsystem - same as above * * level - same as above * * knet_log_set_loglevel allows fine control of log levels by subsystem. * See also knet_handle_new for defaults. * * knet_log_set_loglevel returns: * * 0 on success * -1 on error and errno is set. */ int knet_log_set_loglevel(knet_handle_t knet_h, uint8_t subsystem, uint8_t level); /* * knet_log_get_log_level * * knet_h - same as above * * subsystem - same as above * * level - same as above * * knet_log_get_loglevel returns: * * 0 on success * -1 on error and errno is set. */ int knet_log_get_loglevel(knet_handle_t knet_h, uint8_t subsystem, uint8_t *level); #endif diff --git a/libknet/logging.c b/libknet/logging.c index 8431dc66..8b935824 100644 --- a/libknet/logging.c +++ b/libknet/logging.c @@ -1,246 +1,246 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Author: Fabio M. Di Nitto * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include "internals.h" #include "logging.h" struct pretty_names { const char *name; uint8_t val; }; static struct pretty_names subsystem_names[] = { { "common", KNET_SUB_COMMON }, { "handle", KNET_SUB_HANDLE }, { "host", KNET_SUB_HOST }, { "listener", KNET_SUB_LISTENER }, { "link", KNET_SUB_LINK }, { "transport", KNET_SUB_TRANSPORT }, { "crypto", KNET_SUB_CRYPTO }, { "filter", KNET_SUB_FILTER }, { "dstcache", KNET_SUB_DSTCACHE }, { "heartbeat", KNET_SUB_HEARTBEAT }, { "pmtud", KNET_SUB_PMTUD }, { "tx", KNET_SUB_TX }, - { "link_t", KNET_SUB_LINK_T }, + { "rx", KNET_SUB_RX }, { "udp_t", KNET_SUB_UDP_LINK_T }, { "sctp_t", KNET_SUB_SCTP_LINK_T }, { "nsscrypto", KNET_SUB_NSSCRYPTO }, { "unknown", KNET_SUB_UNKNOWN } /* unknown MUST always be last in this array */ }; const char *knet_log_get_subsystem_name(uint8_t subsystem) { unsigned int i; for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) { if (subsystem_names[i].val == KNET_SUB_UNKNOWN) { break; } if (subsystem_names[i].val == subsystem) { return subsystem_names[i].name; } } return "unknown"; } uint8_t knet_log_get_subsystem_id(const char *name) { unsigned int i; for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) { if (subsystem_names[i].val == KNET_SUB_UNKNOWN) { break; } if (strcasecmp(name, subsystem_names[i].name) == 0) { return subsystem_names[i].val; } } return KNET_SUB_UNKNOWN; } static int is_valid_subsystem(uint8_t subsystem) { unsigned int i; for (i = 0; i < KNET_MAX_SUBSYSTEMS; i++) { if ((subsystem != KNET_SUB_UNKNOWN) && (subsystem_names[i].val == KNET_SUB_UNKNOWN)) { break; } if (subsystem_names[i].val == subsystem) { return 0; } } return -1; } static struct pretty_names loglevel_names[] = { { "ERROR", KNET_LOG_ERR }, { "WARNING", KNET_LOG_WARN }, { "info", KNET_LOG_INFO }, { "debug", KNET_LOG_DEBUG } }; const char *knet_log_get_loglevel_name(uint8_t level) { unsigned int i; for (i = 0; i <= KNET_LOG_DEBUG; i++) { if (loglevel_names[i].val == level) { return loglevel_names[i].name; } } return "ERROR"; } uint8_t knet_log_get_loglevel_id(const char *name) { unsigned int i; for (i = 0; i <= KNET_LOG_DEBUG; i++) { if (strcasecmp(name, loglevel_names[i].name) == 0) { return loglevel_names[i].val; } } return KNET_LOG_ERR; } int knet_log_set_loglevel(knet_handle_t knet_h, uint8_t subsystem, uint8_t level) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if (is_valid_subsystem(subsystem) < 0) { errno = EINVAL; return -1; } if (level > KNET_LOG_DEBUG) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, subsystem, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->log_levels[subsystem] = level; pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } int knet_log_get_loglevel(knet_handle_t knet_h, uint8_t subsystem, uint8_t *level) { int savederrno = 0; if (!knet_h) { errno = EINVAL; return -1; } if (is_valid_subsystem(subsystem) < 0) { errno = EINVAL; return -1; } if (!level) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, subsystem, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } *level = knet_h->log_levels[subsystem]; pthread_rwlock_unlock(&knet_h->global_rwlock); return 0; } void log_msg(knet_handle_t knet_h, uint8_t subsystem, uint8_t msglevel, const char *fmt, ...) { va_list ap; struct knet_log_msg msg; size_t byte_cnt = 0; int len, err; if ((!knet_h) || (subsystem == KNET_MAX_SUBSYSTEMS) || (msglevel > knet_h->log_levels[subsystem])) return; /* * most logging calls will take place with locking in place. * if we get an EINVAL and locking is initialized, then * we are getting a real error and we need to stop */ err = pthread_rwlock_tryrdlock(&knet_h->global_rwlock); if ((err == EAGAIN) && (knet_h->lock_init_done)) return; if (knet_h->logfd <= 0) goto out_unlock; memset(&msg, 0, sizeof(struct knet_log_msg)); msg.subsystem = subsystem; msg.msglevel = msglevel; va_start(ap, fmt); vsnprintf(msg.msg, sizeof(msg.msg) - 2, fmt, ap); va_end(ap); len = strlen(msg.msg); msg.msg[len+1] = '\n'; while (byte_cnt < sizeof(struct knet_log_msg)) { len = write(knet_h->logfd, &msg, sizeof(struct knet_log_msg) - byte_cnt); if (len <= 0) return; byte_cnt += len; } out_unlock: /* * unlock only if we are holding the lock */ if (!err) pthread_rwlock_unlock(&knet_h->global_rwlock); return; } diff --git a/libknet/threads_send_recv.c b/libknet/threads_send_recv.c index 9cc6f476..71c258ff 100644 --- a/libknet/threads_send_recv.c +++ b/libknet/threads_send_recv.c @@ -1,1204 +1,1204 @@ /* * Copyright (C) 2010-2015 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under GPL-2.0+, LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include #include #ifdef HAVE_NETINET_SCTP_H #include #endif #include "crypto.h" #include "compat.h" #include "host.h" #include "link.h" #include "logging.h" #include "transports.h" #include "threads_common.h" #include "threads_send_recv.h" /* * SEND */ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct iovec *iov_out) { int link_idx, msg_idx, sent_msgs, msgs_to_send, prev_sent, progress; struct mmsghdr msg[PCKT_FRAG_MAX]; int err = 0, savederrno = 0; memset(&msg, 0, sizeof(struct mmsghdr)); for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) { msgs_to_send = knet_h->send_to_links_buf[0]->khp_data_frag_num; sent_msgs = 0; prev_sent = 0; progress = 1; retry: msg_idx = 0; while (msg_idx < msgs_to_send) { memset(&msg[msg_idx].msg_hdr, 0, sizeof(struct msghdr)); msg[msg_idx].msg_hdr.msg_name = &dst_host->link[dst_host->active_links[link_idx]].dst_addr; msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx + prev_sent]; msg[msg_idx].msg_hdr.msg_iovlen = 1; msg_idx++; } sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock, msg, msg_idx, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; if ((sent_msgs >= 0) && (sent_msgs < msg_idx)) { if ((sent_msgs) || (progress)) { msgs_to_send = msg_idx - sent_msgs; prev_sent = prev_sent + sent_msgs; if (sent_msgs) { progress = 1; } else { progress = 0; } log_debug(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)", sent_msgs, msg_idx, dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id); goto retry; } if (!progress) { savederrno = EAGAIN; err = -1; goto out_unlock; } } if (sent_msgs < 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to send data packet to host %s (%u) link %s:%s (%u): %s", dst_host->name, dst_host->host_id, dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr, dst_host->link[dst_host->active_links[link_idx]].status.dst_port, dst_host->link[dst_host->active_links[link_idx]].link_id, strerror(savederrno)); err = -1; goto out_unlock; } if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) && (dst_host->active_link_entries > 1)) { uint8_t cur_link_id = dst_host->active_links[0]; memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1); dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id; break; } } out_unlock: errno = savederrno; return err; } static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inlen, int8_t channel, int is_sync) { ssize_t outlen, frag_len; struct knet_host *dst_host; uint16_t dst_host_ids_temp[KNET_MAX_HOST]; size_t dst_host_ids_entries_temp = 0; uint16_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[PCKT_FRAG_MAX]; uint8_t frag_idx; unsigned int temp_data_mtu; int host_idx; int send_mcast = 0; struct knet_header *inbuf; int savederrno = 0; int err = 0; inbuf = knet_h->recv_from_sock_buf[buf_idx]; if ((knet_h->enabled != 1) && (inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); savederrno = ECANCELED; err = -1; goto out_unlock; } /* * move this into a separate function to expand on * extra switching rules */ switch(inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: if (knet_h->dst_host_filter_fn) { bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, inlen, KNET_NOTIFY_TX, knet_h->host_id, knet_h->host_id, &channel, dst_host_ids_temp, &dst_host_ids_entries_temp); if (bcast < 0) { log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); savederrno = EFAULT; err = -1; goto out_unlock; } if ((!bcast) && (!dst_host_ids_entries_temp)) { log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); savederrno = EINVAL; err = -1; goto out_unlock; } } break; case KNET_HEADER_TYPE_HOST_INFO: knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { bcast = 0; dst_host_ids_temp[0] = knet_hostinfo->khi_dst_node_id; dst_host_ids_entries_temp = 1; knet_hostinfo->khi_dst_node_id = htons(knet_hostinfo->khi_dst_node_id); } break; default: log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket"); savederrno = ENOMSG; err = -1; goto out_unlock; break; } if (is_sync) { if ((bcast) || ((!bcast) && (dst_host_ids_entries_temp > 1))) { log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); savederrno = E2BIG; err = -1; goto out_unlock; } } /* * check destinations hosts before spending time * in fragmenting/encrypting packets to save * time processing data for unrechable hosts. * for unicast, also remap the destination data * to skip unreachable hosts. */ if (!bcast) { dst_host_ids_entries = 0; for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; if (!dst_host) { continue; } if (dst_host->status.reachable) { dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; dst_host_ids_entries++; } } if (!dst_host_ids_entries) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } else { send_mcast = 0; for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { send_mcast = 1; break; } } if (!send_mcast) { savederrno = EHOSTDOWN; err = -1; goto out_unlock; } } if (!knet_h->data_mtu) { /* * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough */ log_debug(knet_h, KNET_SUB_TX, "Received data packet but data MTU is still unknown." " Packet might not be delivered." " Assuming mininum IPv4 mtu (%d)", KNET_PMTUD_MIN_MTU_V4); temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; } else { /* * take a copy of the mtu to avoid value changing under * our feet while we are sending a fragmented pckt */ temp_data_mtu = knet_h->data_mtu; } /* * prepare the outgoing buffers */ frag_len = inlen; frag_idx = 0; inbuf->khp_data_bcast = bcast; inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); inbuf->khp_data_channel = channel; while (frag_idx < inbuf->khp_data_frag_num) { /* * set the iov_base */ iov_out[frag_idx].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; /* * set the len */ if (frag_len > temp_data_mtu) { iov_out[frag_idx].iov_len = temp_data_mtu + KNET_HEADER_DATA_SIZE; } else { iov_out[frag_idx].iov_len = frag_len + KNET_HEADER_DATA_SIZE; } /* * copy the frag info on all buffers */ knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; memmove(knet_h->send_to_links_buf[frag_idx]->khp_data_userdata, inbuf->khp_data_userdata + (temp_data_mtu * frag_idx), iov_out[frag_idx].iov_len - KNET_HEADER_DATA_SIZE); frag_len = frag_len - temp_data_mtu; frag_idx++; } if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { dst_host = knet_h->host_index[dst_host_ids[host_idx]]; knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(++dst_host->ucast_seq_num_tx); frag_idx = 0; while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) { knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num; if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign( knet_h, (const unsigned char *)knet_h->send_to_links_buf[frag_idx], iov_out[frag_idx].iov_len, knet_h->send_to_links_buf_crypt[frag_idx], &outlen) < 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt unicast packet"); savederrno = ECHILD; err = -1; goto out_unlock; } iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; iov_out[frag_idx].iov_len = outlen; } frag_idx++; } err = _dispatch_to_links(knet_h, dst_host, iov_out); savederrno = errno; if (err) { goto out_unlock; } } } else { knet_h->send_to_links_buf[0]->khp_data_seq_num = htons(++knet_h->bcast_seq_num_tx); frag_idx = 0; while (frag_idx < knet_h->send_to_links_buf[0]->khp_data_frag_num) { knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = knet_h->send_to_links_buf[0]->khp_data_seq_num; if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign( knet_h, (const unsigned char *)knet_h->send_to_links_buf[frag_idx], iov_out[frag_idx].iov_len, knet_h->send_to_links_buf_crypt[frag_idx], &outlen) < 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt unicast packet"); savederrno = ECHILD; err = -1; goto out_unlock; } iov_out[frag_idx].iov_base = knet_h->send_to_links_buf_crypt[frag_idx]; iov_out[frag_idx].iov_len = outlen; } frag_idx++; } for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { if (dst_host->status.reachable) { err = _dispatch_to_links(knet_h, dst_host, iov_out); savederrno = errno; if (err) { goto out_unlock; } } } } out_unlock: if ((inlen > 0) && (inbuf->kh_type == KNET_HEADER_TYPE_HOST_INFO)) { if (pthread_mutex_lock(&knet_h->host_mutex) != 0) log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); pthread_cond_signal(&knet_h->host_cond); pthread_mutex_unlock(&knet_h->host_mutex); } errno = savederrno; return err; } int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0, err = 0; if (!knet_h) { errno = EINVAL; return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out; } savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", strerror(savederrno)); err = -1; goto out; } knet_h->recv_from_sock_buf[0]->kh_type = KNET_HEADER_TYPE_DATA; memmove(knet_h->recv_from_sock_buf[0]->khp_data_userdata, buff, buff_len); err = _parse_recv_from_sock(knet_h, 0, buff_len, channel, 1); savederrno = errno; pthread_mutex_unlock(&knet_h->tx_mutex); out: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct mmsghdr *msg, int type) { ssize_t inlen = 0; struct iovec iov_in; int msg_recv, i; int savederrno = 0, docallback = 0; if ((channel >= 0) && (channel < KNET_DATAFD_MAX) && (!knet_h->sockfd[channel].is_socket)) { memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)knet_h->recv_from_sock_buf[0]->khp_data_userdata; iov_in.iov_len = KNET_MAX_PACKET_SIZE; inlen = readv(sockfd, &iov_in, 1); if (inlen <= 0) { savederrno = errno; docallback = 1; goto out; } msg_recv = 1; knet_h->recv_from_sock_buf[0]->kh_type = type; _parse_recv_from_sock(knet_h, 0, inlen, channel, 0); } else { msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); if (msg_recv < 0) { inlen = msg_recv; savederrno = errno; docallback = 1; goto out; } for (i = 0; i < msg_recv; i++) { inlen = msg[i].msg_len; if (inlen == 0) { savederrno = 0; docallback = 1; goto out; break; } knet_h->recv_from_sock_buf[i]->kh_type = type; _parse_recv_from_sock(knet_h, i, inlen, channel, 0); } } out: if (inlen < 0) { struct epoll_event ev; memset(&ev, 0, sizeof(struct epoll_event)); if (epoll_ctl(knet_h->send_to_links_epollfd, EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) { log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s", knet_h->sockfd[channel].sockfd[0], strerror(savederrno)); } else { knet_h->sockfd[channel].has_error = 1; } } if (docallback) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_TX, inlen, savederrno); } } void *_handle_send_to_links_thread(void *data) { knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; int i, nev, type; int8_t channel; memset(&msg, 0, sizeof(struct mmsghdr)); /* preparing data buffer */ for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_sock_buf[i]->khp_data_userdata; iov_in[i].iov_len = KNET_MAX_PACKET_SIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; knet_h->recv_from_sock_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->recv_from_sock_buf[i]->khp_data_frag_seq = 0; knet_h->recv_from_sock_buf[i]->kh_node = htons(knet_h->host_id); knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION; knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1; knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id); } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, -1); if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock"); continue; } for (i = 0; i < nev; i++) { if (events[i].data.fd == knet_h->hostsockfd[0]) { type = KNET_HEADER_TYPE_HOST_INFO; channel = -1; } else { type = KNET_HEADER_TYPE_DATA; for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { if ((knet_h->sockfd[channel].in_use) && (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) { break; } } } if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) { log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); pthread_rwlock_unlock(&knet_h->listener_rwlock); continue; } _handle_send_to_links(knet_h, events[i].data.fd, channel, msg, type); pthread_mutex_unlock(&knet_h->tx_mutex); } pthread_rwlock_unlock(&knet_h->global_rwlock); } return NULL; } /* * RECV */ /* * return 1 if a > b * return -1 if b > a * return 0 if they are equal */ static inline int timecmp(struct timespec a, struct timespec b) { if (a.tv_sec != b.tv_sec) { if (a.tv_sec > b.tv_sec) { return 1; } else { return -1; } } else { if (a.tv_nsec > b.tv_nsec) { return 1; } else if (a.tv_nsec < b.tv_nsec) { return -1; } else { return 0; } } } /* * this functions needs to return an index (0 to 7) * to a knet_host_defrag_buf. (-1 on errors) */ static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) { struct knet_host *src_host = knet_h->host_index[inbuf->kh_node]; int i, oldest; /* * check if there is a buffer already in use handling the same seq_num */ for (i = 0; i < KNET_MAX_LINK; i++) { if (src_host->defrag_buf[i].in_use) { if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) { return i; } } } /* * If there is no buffer that's handling the current seq_num * either it's new or it's been reclaimed already. * check if it's been reclaimed/seen before using the defrag circular * buffer. If the pckt has been seen before, the buffer expired (ETIME) * and there is no point to try to defrag it again. */ if (!_seq_num_lookup(src_host, inbuf->khp_data_bcast, inbuf->khp_data_seq_num, 1)) { errno = ETIME; return -1; } /* * register the pckt as seen */ _seq_num_set(src_host, inbuf->khp_data_bcast, inbuf->khp_data_seq_num, 1); /* * see if there is a free buffer */ for (i = 0; i < KNET_MAX_LINK; i++) { if (!src_host->defrag_buf[i].in_use) { return i; } } /* * at this point, there are no free buffers, the pckt is new * and we need to reclaim a buffer, and we will take the one * with the oldest timestamp. It's as good as any. */ oldest = 0; for (i = 0; i < KNET_MAX_LINK; i++) { if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { oldest = i; } } src_host->defrag_buf[oldest].in_use = 0; return oldest; } static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len) { struct knet_host_defrag_buf *defrag_buf; int defrag_buf_idx; defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf); if (defrag_buf_idx < 0) { if (errno == ETIME) { - log_debug(knet_h, KNET_SUB_LINK_T, "Defrag buffer expired"); + log_debug(knet_h, KNET_SUB_RX, "Defrag buffer expired"); } return 1; } defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx]; /* * if the buf is not is use, then make sure it's clean */ if (!defrag_buf->in_use) { memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); defrag_buf->in_use = 1; defrag_buf->pckt_seq = inbuf->khp_data_seq_num; } /* * update timestamp on the buffer */ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); /* * check if we already received this fragment */ if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) { /* * if we have received this fragment and we didn't clear the buffer * it means that we don't have all fragments yet */ return 1; } /* * we need to handle the last packet with gloves due to its different size */ if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) { defrag_buf->last_frag_size = *len; /* * in the event when the last packet arrives first, * we still don't know the offset vs the other fragments (based on MTU), * so we store the fragment at the end of the buffer where it's safe * and take a copy of the len so that we can restore its offset later. * remember we can't use the local MTU for this calculation because pMTU * can be asymettric between the same hosts. */ if (!defrag_buf->frag_size) { defrag_buf->last_first = 1; memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), inbuf->khp_data_userdata, *len); } } else { defrag_buf->frag_size = *len; } memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), inbuf->khp_data_userdata, *len); defrag_buf->frag_recv++; defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1; /* * check if we received all the fragments */ if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) { /* * special case the last pckt */ if (defrag_buf->last_first) { memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size), defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), defrag_buf->last_frag_size); } /* * recalculate packet lenght */ *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; /* * copy the pckt back in the user data */ memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len); /* * free this buffer */ defrag_buf->in_use = 0; return 0; } return 1; } static void _parse_recv_from_links(knet_handle_t knet_h, struct sockaddr_storage *address, int ind, ssize_t len) { ssize_t outlen; struct knet_host *src_host; struct knet_link *src_link; unsigned long long latency_last; uint16_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; struct timespec recvtime; struct knet_header *inbuf = knet_h->recv_from_links_buf[ind]; unsigned char *outbuf = (unsigned char *)knet_h->recv_from_links_buf[ind]; struct knet_hostinfo *knet_hostinfo; struct iovec iov_out[1]; int8_t channel; if (knet_h->crypto_instance) { if (crypto_authenticate_and_decrypt(knet_h, (unsigned char *)inbuf, len, knet_h->recv_from_links_buf_decrypt, &outlen) < 0) { - log_debug(knet_h, KNET_SUB_LINK_T, "Unable to decrypt/auth packet"); + log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); return; } len = outlen; inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; } if (len < (KNET_HEADER_SIZE + 1)) { - log_debug(knet_h, KNET_SUB_LINK_T, "Packet is too short: %ld", len); + log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", len); return; } if (inbuf->kh_version != KNET_HEADER_VERSION) { - log_debug(knet_h, KNET_SUB_LINK_T, "Packet version does not match"); + log_debug(knet_h, KNET_SUB_RX, "Packet version does not match"); return; } inbuf->kh_node = ntohs(inbuf->kh_node); src_host = knet_h->host_index[inbuf->kh_node]; if (src_host == NULL) { /* host not found */ - log_debug(knet_h, KNET_SUB_LINK_T, "Unable to find source host for this packet"); + log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); return; } src_link = NULL; if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) { src_link = src_host->link + (inbuf->khp_ping_link % KNET_MAX_LINK); if (src_link->dynamic == KNET_LINK_DYNIP) { if (memcmp(&src_link->dst_addr, address, sizeof(struct sockaddr_storage)) != 0) { - log_debug(knet_h, KNET_SUB_LINK_T, "host: %u link: %u appears to have changed ip address", + log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", src_host->host_id, src_link->link_id); memmove(&src_link->dst_addr, address, sizeof(struct sockaddr_storage)); if (getnameinfo((const struct sockaddr *)&src_link->dst_addr, sizeof(struct sockaddr_storage), src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, src_link->status.dst_port, KNET_MAX_PORT_LEN, NI_NUMERICHOST | NI_NUMERICSERV) != 0) { - log_debug(knet_h, KNET_SUB_LINK_T, "Unable to resolve ???"); + log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); } } src_link->status.dynconnected = 1; } } switch (inbuf->kh_type) { case KNET_HEADER_TYPE_HOST_INFO: case KNET_HEADER_TYPE_DATA: inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num); channel = inbuf->khp_data_channel; if (!_seq_num_lookup(src_host, inbuf->khp_data_bcast, inbuf->khp_data_seq_num, 0)) { if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { - log_debug(knet_h, KNET_SUB_LINK_T, "Packet has already been delivered"); + log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); } return; } if (inbuf->khp_data_frag_num > 1) { /* * len as received from the socket also includes extra stuff * that the defrag code doesn't care about. So strip it * here and readd only for repadding once we are done * defragging */ len = len - KNET_HEADER_DATA_SIZE; if (pckt_defrag(knet_h, inbuf, &len)) { return; } len = len + KNET_HEADER_DATA_SIZE; } if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (knet_h->enabled != 1) /* data forward is disabled */ break; if (knet_h->dst_host_filter_fn) { int host_idx; int found = 0; bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, (const unsigned char *)inbuf->khp_data_userdata, len - KNET_HEADER_DATA_SIZE, KNET_NOTIFY_RX, knet_h->host_id, inbuf->kh_node, &channel, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) { - log_debug(knet_h, KNET_SUB_LINK_T, "Error from dst_host_filter_fn: %d", bcast); + log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); return; } if ((!bcast) && (!dst_host_ids_entries)) { - log_debug(knet_h, KNET_SUB_LINK_T, "Message is unicast but no dst_host_ids_entries"); + log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); return; } /* check if we are dst for this packet */ if (!bcast) { for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { if (dst_host_ids[host_idx] == knet_h->host_id) { found = 1; break; } } if (!found) { - log_debug(knet_h, KNET_SUB_LINK_T, "Packet is not for us"); + log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); return; } } } } if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) { if (!knet_h->sockfd[channel].in_use) { - log_debug(knet_h, KNET_SUB_LINK_T, + log_debug(knet_h, KNET_SUB_RX, "received packet for channel %d but there is no local sock connected", channel); return; } memset(iov_out, 0, sizeof(iov_out)); iov_out[0].iov_base = (void *) inbuf->khp_data_userdata; iov_out[0].iov_len = len - KNET_HEADER_DATA_SIZE; outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); if (outlen <= 0) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_RX, outlen, errno); return; } if (outlen == iov_out[0].iov_len) { _seq_num_set(src_host, bcast, inbuf->khp_data_seq_num, 0); } } else { /* HOSTINFO */ knet_hostinfo = (struct knet_hostinfo *)inbuf->khp_data_userdata; if (knet_hostinfo->khi_bcast == KNET_HOSTINFO_UCAST) { bcast = 0; knet_hostinfo->khi_dst_node_id = ntohs(knet_hostinfo->khi_dst_node_id); } if (!_seq_num_lookup(src_host, bcast, inbuf->khp_data_seq_num, 0)) { return; } _seq_num_set(src_host, bcast, inbuf->khp_data_seq_num, 0); switch(knet_hostinfo->khi_type) { case KNET_HOSTINFO_TYPE_LINK_UP_DOWN: src_link = src_host->link + (knet_hostinfo->khip_link_status_link_id % KNET_MAX_LINK); /* * basically if the node is coming back to life from a crash * we should receive a host info where local previous status == remote current status * and so we can detect that node is showing up again * we need to clear cbuffers and notify the node of our status by resending our host info */ if ((src_link->remoteconnected == KNET_HOSTINFO_LINK_STATUS_UP) && (src_link->remoteconnected == knet_hostinfo->khip_link_status_status)) { src_link->host_info_up_sent = 0; } src_link->remoteconnected = knet_hostinfo->khip_link_status_status; if (src_link->remoteconnected == KNET_HOSTINFO_LINK_STATUS_DOWN) { /* * if a host is disconnecting clean, we note that in donnotremoteupdate * so that we don't send host info back immediately but we wait * for the node to send an update when it's alive again */ src_link->host_info_up_sent = 0; src_link->donnotremoteupdate = 1; } else { src_link->donnotremoteupdate = 0; } - log_debug(knet_h, KNET_SUB_LINK_T, "host message up/down. from host: %u link: %u remote connected: %u", + log_debug(knet_h, KNET_SUB_RX, "host message up/down. from host: %u link: %u remote connected: %u", src_host->host_id, src_link->link_id, src_link->remoteconnected); if (_host_dstcache_update_async(knet_h, src_host)) { - log_debug(knet_h, KNET_SUB_LINK_T, + log_debug(knet_h, KNET_SUB_RX, "Unable to update switch cache for host: %u link: %u remote connected: %u)", src_host->host_id, src_link->link_id, src_link->remoteconnected); } break; case KNET_HOSTINFO_TYPE_LINK_TABLE: break; default: - log_warn(knet_h, KNET_SUB_LINK_T, "Receiving unknown host info message from host %u", src_host->host_id); + log_warn(knet_h, KNET_SUB_RX, "Receiving unknown host info message from host %u", src_host->host_id); break; } } break; case KNET_HEADER_TYPE_PING: outlen = KNET_HEADER_PING_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PONG; inbuf->kh_node = htons(knet_h->host_id); if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, len, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { - log_debug(knet_h, KNET_SUB_LINK_T, "Unable to encrypt pong packet"); + log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; } if (sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)) != outlen) { - log_debug(knet_h, KNET_SUB_LINK_T, + log_debug(knet_h, KNET_SUB_RX, "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); } break; case KNET_HEADER_TYPE_PONG: clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last); memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec)); timespec_diff(recvtime, src_link->status.pong_last, &latency_last); src_link->status.latency = ((src_link->status.latency * src_link->latency_exp) + ((latency_last / 1000llu) * (src_link->latency_fix - src_link->latency_exp))) / src_link->latency_fix; if (src_link->status.latency < src_link->pong_timeout) { if (!src_link->status.connected) { if (src_link->received_pong >= src_link->pong_count) { - log_info(knet_h, KNET_SUB_LINK_T, "host: %u link: %u is up", + log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up", src_host->host_id, src_link->link_id); _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1); } else { src_link->received_pong++; - log_debug(knet_h, KNET_SUB_LINK_T, "host: %u link: %u received pong: %u", + log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u", src_host->host_id, src_link->link_id, src_link->received_pong); } } } break; case KNET_HEADER_TYPE_PMTUD: outlen = KNET_HEADER_PMTUD_SIZE; inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY; inbuf->kh_node = htons(knet_h->host_id); if (knet_h->crypto_instance) { if (crypto_encrypt_and_sign(knet_h, (const unsigned char *)inbuf, len, knet_h->recv_from_links_buf_crypt, &outlen) < 0) { - log_debug(knet_h, KNET_SUB_LINK_T, "Unable to encrypt PMTUd reply packet"); + log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet"); break; } outbuf = knet_h->recv_from_links_buf_crypt; } if (sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, (struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage)) != outlen) { - log_debug(knet_h, KNET_SUB_LINK_T, + log_debug(knet_h, KNET_SUB_RX, "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s", src_link->outsock, errno, strerror(errno), src_link->status.src_ipaddr, src_link->status.src_port, src_link->status.dst_ipaddr, src_link->status.dst_port); } break; case KNET_HEADER_TYPE_PMTUD_REPLY: if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) { - log_debug(knet_h, KNET_SUB_LINK_T, "Unable to get mutex lock"); + log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock"); break; } src_link->last_recv_mtu = inbuf->khp_pmtud_size; pthread_cond_signal(&knet_h->pmtud_cond); pthread_mutex_unlock(&knet_h->pmtud_mutex); break; default: return; } } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg) { int i, msg_recv; if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { - log_debug(knet_h, KNET_SUB_LINK_T, "Unable to get read lock"); + log_debug(knet_h, KNET_SUB_RX, "Unable to get read lock"); return; } msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL); if (msg_recv < 0) { - log_err(knet_h, KNET_SUB_LINK_T, "No message received from recvmmsg: %s", strerror(errno)); + log_err(knet_h, KNET_SUB_RX, "No message received from recvmmsg: %s", strerror(errno)); goto exit_unlock; } if (msg_recv == 0) { _close_socket(knet_h, sockfd); } for (i = 0; i < msg_recv; i++) { #ifdef HAVE_NETINET_SCTP_H if (msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION) { _handle_socket_notification(knet_h, sockfd, msg[i].msg_hdr.msg_iov, msg[i].msg_hdr.msg_iovlen); continue; } #endif if (msg[i].msg_len == 0) { _close_socket(knet_h, sockfd); goto exit_unlock; } else { _parse_recv_from_links(knet_h, (struct sockaddr_storage *)&msg[i].msg_hdr.msg_name, i, msg[i].msg_len); } } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_FRAG_MAX]; struct mmsghdr msg[PCKT_FRAG_MAX]; struct iovec iov_in[PCKT_FRAG_MAX]; memset(&msg, 0, sizeof(struct mmsghdr)); for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; iov_in[i].iov_len = KNET_DATABUFSIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, -1); for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd, msg); } } return NULL; } diff --git a/libknet/transport_common.c b/libknet/transport_common.c index 2996181a..afacd04b 100644 --- a/libknet/transport_common.c +++ b/libknet/transport_common.c @@ -1,177 +1,177 @@ #include "config.h" #include #include #include #include #include #include #include #include #include #include #include #include #include "libknet.h" #include "host.h" #include "link.h" #include "logging.h" #include "common.h" #include "transports.h" #include "../common/netutils.h" int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type) { int err = 0; int value; int savederrno; value = KNET_RING_RCVBUFF; if (setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s receive buffer: %s", type, strerror(savederrno)); goto exit_error; } value = KNET_RING_RCVBUFF; if (setsockopt(sock, SOL_SOCKET, SO_SNDBUFFORCE, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s send buffer: %s", type, strerror(savederrno)); goto exit_error; } value = 1; if (setsockopt(sock, SOL_IP, IP_FREEBIND, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set FREEBIND on %s socket: %s", type, strerror(savederrno)); goto exit_error; } if (address->ss_family == AF_INET6) { value = 1; if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s IPv6 only: %s", type, strerror(savederrno)); goto exit_error; } value = IPV6_PMTUDISC_PROBE; if (setsockopt(sock, SOL_IPV6, IPV6_MTU_DISCOVER, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s", type, strerror(savederrno)); goto exit_error; } } else { value = IP_PMTUDISC_PROBE; if (setsockopt(sock, SOL_IP, IP_MTU_DISCOVER, &value, sizeof(value)) <0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set PMTUDISC on %s socket: %s", type, strerror(savederrno)); goto exit_error; } } value = 1; if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s reuseaddr: %s", type, strerror(savederrno)); goto exit_error; } if (_fdset_cloexec(sock)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s CLOEXEC socket opts: %s", type, strerror(savederrno)); goto exit_error; } if (_fdset_nonblock(sock)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set %s NONBLOCK socket opts: %s", type, strerror(savederrno)); goto exit_error; } err = 0; exit_error: return err; } void _close_socket(knet_handle_t knet_h, int sockfd) { struct epoll_event ev; int i; - log_err(knet_h, KNET_SUB_LINK_T, "EOF received on socket fd %d", sockfd); + log_err(knet_h, KNET_SUB_RX, "EOF received on socket fd %d", sockfd); memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = sockfd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) { log_err(knet_h, KNET_SUB_LISTENER, "Unable to remove EOFed socket from epoll pool: %s", strerror(errno)); } /* Tell transport that the FD has been closed */ for (i=0; itransport_ops[i]) && (knet_h->transport_ops[i]->handle_fd_eof) && (!knet_h->transport_ops[i]->handle_fd_eof(knet_h, sockfd))) break; } } void _handle_socket_notification(knet_handle_t knet_h, int sockfd, struct iovec *iov, size_t iovlen) { int i; /* Find the transport and post the message */ for (i=0; itransport_ops[i]) && (knet_h->transport_ops[i]->handle_fd_notification) && (knet_h->transport_ops[i]->handle_fd_notification(knet_h, sockfd, iov, iovlen))) break; } } /* * Wrappers for addrtostr() & addrtostr_free() for use when we only need the IP address * printing in DEBUG mode - it's to heavy for within normal use */ int _transport_addrtostr(const struct sockaddr *sa, socklen_t salen, char *str[2]) { #ifdef DEBUG return addrtostr(sa, salen, str); #else str[0] = (char*)"node"; str[1] = (char*)""; return 0; #endif } void _transport_addrtostr_free(char *str[2]) { #ifdef DEBUG addrtostr_free(str); #else #endif }