diff --git a/exec/totemip.c b/exec/totemip.c index 7ba746e9..28a88365 100644 --- a/exec/totemip.c +++ b/exec/totemip.c @@ -1,490 +1,512 @@ /* * Copyright (c) 2005-2011 Red Hat, Inc. * * All rights reserved. * * Author: Patrick Caulfield (pcaulfie@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ /* IPv4/6 abstraction */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define LOCALHOST_IPV4 "127.0.0.1" #define LOCALHOST_IPV6 "::1" #define NETLINK_BUFSIZE 16384 #ifdef SO_NOSIGPIPE void totemip_nosigpipe(int s) { int on = 1; setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, (void *)&on, sizeof(on)); } #endif /* Compare two addresses */ int totemip_equal(const struct totem_ip_address *addr1, const struct totem_ip_address *addr2) { int addrlen = 0; if (addr1->family != addr2->family) return 0; if (addr1->family == AF_INET) { addrlen = sizeof(struct in_addr); } if (addr1->family == AF_INET6) { addrlen = sizeof(struct in6_addr); } assert(addrlen); if (memcmp(addr1->addr, addr2->addr, addrlen) == 0) return 1; else return 0; } /* Copy a totem_ip_address */ void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2) { memcpy(addr1, addr2, sizeof(struct totem_ip_address)); } void totemip_copy_endian_convert(struct totem_ip_address *addr1, const struct totem_ip_address *addr2) { addr1->nodeid = swab32(addr2->nodeid); addr1->family = swab16(addr2->family); memcpy(addr1->addr, addr2->addr, TOTEMIP_ADDRLEN); } /* * Multicast address range is 224.0.0.0 to 239.255.255.255 this * translates to the first 4 bits == 1110 (0xE). * http://en.wikipedia.org/wiki/Multicast_address */ int32_t totemip_is_mcast(struct totem_ip_address *ip_addr) { uint32_t addr = 0; memcpy (&addr, ip_addr->addr, sizeof (uint32_t)); if (ip_addr->family == AF_INET) { addr = ntohl(addr); if ((addr >> 28) != 0xE) { return -1; } } return 0; } /* For sorting etc. params are void * for qsort's benefit */ int totemip_compare(const void *a, const void *b) { int i; const struct totem_ip_address *totemip_a = (const struct totem_ip_address *)a; const struct totem_ip_address *totemip_b = (const struct totem_ip_address *)b; struct in_addr ipv4_a1; struct in_addr ipv4_a2; struct in6_addr ipv6_a1; struct in6_addr ipv6_a2; unsigned short family; /* * Use memcpy to align since totem_ip_address is unaligned on various archs */ memcpy (&family, &totemip_a->family, sizeof (unsigned short)); if (family == AF_INET) { memcpy (&ipv4_a1, totemip_a->addr, sizeof (struct in_addr)); memcpy (&ipv4_a2, totemip_b->addr, sizeof (struct in_addr)); if (ipv4_a1.s_addr == ipv4_a2.s_addr) { return (0); } if (htonl(ipv4_a1.s_addr) < htonl(ipv4_a2.s_addr)) { return -1; } else { return +1; } } else if (family == AF_INET6) { /* * We can only compare 8 bits at time for portability reasons */ memcpy (&ipv6_a1, totemip_a->addr, sizeof (struct in6_addr)); memcpy (&ipv6_a2, totemip_b->addr, sizeof (struct in6_addr)); for (i = 0; i < 16; i++) { int res = ipv6_a1.s6_addr[i] - ipv6_a2.s6_addr[i]; if (res) { return res; } } return 0; } else { /* * Family not set, should be! */ assert (0); } return 0; } /* Build a localhost totem_ip_address */ int totemip_localhost(int family, struct totem_ip_address *localhost) { const char *addr_text; memset (localhost, 0, sizeof (struct totem_ip_address)); if (family == AF_INET) { addr_text = LOCALHOST_IPV4; if (inet_pton(family, addr_text, (char *)&localhost->nodeid) <= 0) { return -1; } } else { addr_text = LOCALHOST_IPV6; } if (inet_pton(family, addr_text, (char *)localhost->addr) <= 0) return -1; localhost->family = family; return 0; } int totemip_localhost_check(const struct totem_ip_address *addr) { struct totem_ip_address localhost; if (totemip_localhost(addr->family, &localhost)) return 0; return totemip_equal(addr, &localhost); } const char *totemip_print(const struct totem_ip_address *addr) { static char buf[INET6_ADDRSTRLEN]; return (inet_ntop(addr->family, addr->addr, buf, sizeof(buf))); } /* Make a totem_ip_address into a usable sockaddr_storage */ int totemip_totemip_to_sockaddr_convert(struct totem_ip_address *ip_addr, uint16_t port, struct sockaddr_storage *saddr, int *addrlen) { int ret = -1; if (ip_addr->family == AF_INET) { struct sockaddr_in *sin = (struct sockaddr_in *)saddr; memset(sin, 0, sizeof(struct sockaddr_in)); #ifdef HAVE_SOCK_SIN_LEN sin->sin_len = sizeof(struct sockaddr_in); #endif sin->sin_family = ip_addr->family; sin->sin_port = ntohs(port); memcpy(&sin->sin_addr, ip_addr->addr, sizeof(struct in_addr)); *addrlen = sizeof(struct sockaddr_in); ret = 0; } if (ip_addr->family == AF_INET6) { struct sockaddr_in6 *sin = (struct sockaddr_in6 *)saddr; memset(sin, 0, sizeof(struct sockaddr_in6)); #ifdef HAVE_SOCK_SIN6_LEN sin->sin6_len = sizeof(struct sockaddr_in6); #endif sin->sin6_family = ip_addr->family; sin->sin6_port = ntohs(port); sin->sin6_scope_id = 2; memcpy(&sin->sin6_addr, ip_addr->addr, sizeof(struct in6_addr)); *addrlen = sizeof(struct sockaddr_in6); ret = 0; } return ret; } /* Converts an address string string into a totem_ip_address. family can be AF_INET, AF_INET6 or 0 ("for "don't care") */ int totemip_parse(struct totem_ip_address *totemip, const char *addr, int family) { struct addrinfo *ainfo; struct addrinfo ahints; struct sockaddr_in *sa; struct sockaddr_in6 *sa6; int ret; memset(&ahints, 0, sizeof(ahints)); ahints.ai_socktype = SOCK_DGRAM; ahints.ai_protocol = IPPROTO_UDP; ahints.ai_family = family; /* Lookup the nodename address */ ret = getaddrinfo(addr, NULL, &ahints, &ainfo); if (ret) return -1; sa = (struct sockaddr_in *)ainfo->ai_addr; sa6 = (struct sockaddr_in6 *)ainfo->ai_addr; totemip->family = ainfo->ai_family; if (ainfo->ai_family == AF_INET) memcpy(totemip->addr, &sa->sin_addr, sizeof(struct in_addr)); else memcpy(totemip->addr, &sa6->sin6_addr, sizeof(struct in6_addr)); freeaddrinfo(ainfo); return 0; } /* Make a sockaddr_* into a totem_ip_address */ int totemip_sockaddr_to_totemip_convert(const struct sockaddr_storage *saddr, struct totem_ip_address *ip_addr) { int ret = -1; ip_addr->family = saddr->ss_family; ip_addr->nodeid = 0; if (saddr->ss_family == AF_INET) { const struct sockaddr_in *sin = (const struct sockaddr_in *)saddr; memcpy(ip_addr->addr, &sin->sin_addr, sizeof(struct in_addr)); ret = 0; } if (saddr->ss_family == AF_INET6) { const struct sockaddr_in6 *sin = (const struct sockaddr_in6 *)saddr; memcpy(ip_addr->addr, &sin->sin6_addr, sizeof(struct in6_addr)); ret = 0; } return ret; } int totemip_getifaddrs(struct list_head *addrs) { struct ifaddrs *ifap, *ifa; struct totem_ip_if_address *if_addr; if (getifaddrs(&ifap) != 0) return (-1); list_init(addrs); for (ifa = ifap; ifa; ifa = ifa->ifa_next) { if (ifa->ifa_addr == NULL || ifa->ifa_netmask == NULL) continue ; if ((ifa->ifa_addr->sa_family != AF_INET && ifa->ifa_addr->sa_family != AF_INET6) || (ifa->ifa_netmask->sa_family != AF_INET && ifa->ifa_netmask->sa_family != AF_INET6 && ifa->ifa_netmask->sa_family != 0)) continue ; if (ifa->ifa_netmask->sa_family == 0) { ifa->ifa_netmask->sa_family = ifa->ifa_addr->sa_family; } if_addr = malloc(sizeof(struct totem_ip_if_address)); if (if_addr == NULL) { goto error_free_ifaddrs; } list_init(&if_addr->list); memset(if_addr, 0, sizeof(struct totem_ip_if_address)); if_addr->interface_up = ifa->ifa_flags & IFF_UP; if_addr->interface_num = if_nametoindex(ifa->ifa_name); if_addr->name = strdup(ifa->ifa_name); if (if_addr->name == NULL) { goto error_free_addr; } if (totemip_sockaddr_to_totemip_convert((const struct sockaddr_storage *)ifa->ifa_addr, &if_addr->ip_addr) == -1) { goto error_free_addr_name; } if (totemip_sockaddr_to_totemip_convert((const struct sockaddr_storage *)ifa->ifa_netmask, &if_addr->mask_addr) == -1) { goto error_free_addr_name; } list_add_tail(&if_addr->list, addrs); } freeifaddrs(ifap); return (0); error_free_addr_name: free(if_addr->name); error_free_addr: free(if_addr); error_free_ifaddrs: totemip_freeifaddrs(addrs); freeifaddrs(ifap); return (-1); } void totemip_freeifaddrs(struct list_head *addrs) { struct totem_ip_if_address *if_addr; struct list_head *list; for (list = addrs->next; list != addrs;) { if_addr = list_entry(list, struct totem_ip_if_address, list); list = list->next; free(if_addr->name); list_del(&if_addr->list); free(if_addr); } list_init(addrs); } int totemip_iface_check(struct totem_ip_address *bindnet, struct totem_ip_address *boundto, int *interface_up, int *interface_num, int mask_high_bit) { struct list_head addrs; struct list_head *list; struct totem_ip_if_address *if_addr; struct totem_ip_address bn_netaddr, if_netaddr; socklen_t addr_len; socklen_t si; int res = -1; int exact_match_found = 0; int net_match_found = 0; *interface_up = 0; *interface_num = 0; if (totemip_getifaddrs(&addrs) == -1) { return (-1); } for (list = addrs.next; list != &addrs; list = list->next) { if_addr = list_entry(list, struct totem_ip_if_address, list); if (bindnet->family != if_addr->ip_addr.family) continue ; addr_len = 0; switch (bindnet->family) { case AF_INET: addr_len = sizeof(struct in_addr); break; case AF_INET6: addr_len = sizeof(struct in6_addr); break; } if (addr_len == 0) continue ; totemip_copy(&bn_netaddr, bindnet); totemip_copy(&if_netaddr, &if_addr->ip_addr); if (totemip_equal(&bn_netaddr, &if_netaddr)) { exact_match_found = 1; } for (si = 0; si < addr_len; si++) { bn_netaddr.addr[si] = bn_netaddr.addr[si] & if_addr->mask_addr.addr[si]; if_netaddr.addr[si] = if_netaddr.addr[si] & if_addr->mask_addr.addr[si]; } if (exact_match_found || (!net_match_found && totemip_equal(&bn_netaddr, &if_netaddr))) { totemip_copy(boundto, &if_addr->ip_addr); boundto->nodeid = bindnet->nodeid; *interface_up = if_addr->interface_up; *interface_num = if_addr->interface_num; if (boundto->family == AF_INET && boundto->nodeid == 0) { unsigned int nodeid = 0; memcpy (&nodeid, boundto->addr, sizeof (int)); #if __BYTE_ORDER == __LITTLE_ENDIAN nodeid = swab32 (nodeid); #endif if (mask_high_bit) { nodeid &= 0x7FFFFFFF; } boundto->nodeid = nodeid; } net_match_found = 1; res = 0; if (exact_match_found) { goto finished; } } } finished: totemip_freeifaddrs(&addrs); return (res); } + +#define TOTEMIP_UDP_HEADER_SIZE 8 +#define TOTEMIP_IPV4_HEADER_SIZE 20 +#define TOTEMIP_IPV6_HEADER_SIZE 40 + +size_t totemip_udpip_header_size(int family) +{ + size_t header_size; + + header_size = 0; + + switch (family) { + case AF_INET: + header_size = TOTEMIP_UDP_HEADER_SIZE + TOTEMIP_IPV4_HEADER_SIZE; + break; + case AF_INET6: + header_size = TOTEMIP_UDP_HEADER_SIZE + TOTEMIP_IPV6_HEADER_SIZE; + break; + } + + return (header_size); +} diff --git a/exec/totemudp.c b/exec/totemudp.c index 45771073..86059af6 100644 --- a/exec/totemudp.c +++ b/exec/totemudp.c @@ -1,1424 +1,1426 @@ /* * Copyright (c) 2005 MontaVista Software, Inc. * Copyright (c) 2006-2012 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define LOGSYS_UTILS_ONLY 1 #include #include "totemudp.h" #include "util.h" #include "totemcrypto.h" #include #include #include #include #ifndef MSG_NOSIGNAL #define MSG_NOSIGNAL 0 #endif #define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * FRAME_SIZE_MAX) #define NETIF_STATE_REPORT_UP 1 #define NETIF_STATE_REPORT_DOWN 2 #define BIND_STATE_UNBOUND 0 #define BIND_STATE_REGULAR 1 #define BIND_STATE_LOOPBACK 2 #define MESSAGE_TYPE_MEMB_JOIN 3 struct totemudp_socket { int mcast_recv; int mcast_send; int token; /* * Socket used for local multicast delivery. We don't rely on multicast * loop and rather this UNIX DGRAM socket is used. Socket is created by * socketpair call and they are used in same way as pipe (so [0] is read * end and [1] is write end) */ int local_mcast_loop[2]; }; struct totemudp_instance { struct crypto_instance *crypto_inst; qb_loop_t *totemudp_poll_handle; struct totem_interface *totem_interface; int netif_state_report; int netif_bind_state; void *context; void (*totemudp_deliver_fn) ( void *context, const void *msg, unsigned int msg_len); void (*totemudp_iface_change_fn) ( void *context, const struct totem_ip_address *iface_address); void (*totemudp_target_set_completed) (void *context); /* * Function and data used to log messages */ int totemudp_log_level_security; int totemudp_log_level_error; int totemudp_log_level_warning; int totemudp_log_level_notice; int totemudp_log_level_debug; int totemudp_subsys_id; void (*totemudp_log_printf) ( int level, int subsys, const char *function, const char *file, int line, const char *format, ...)__attribute__((format(printf, 6, 7))); void *udp_context; char iov_buffer[FRAME_SIZE_MAX]; char iov_buffer_flush[FRAME_SIZE_MAX]; struct iovec totemudp_iov_recv; struct iovec totemudp_iov_recv_flush; struct totemudp_socket totemudp_sockets; struct totem_ip_address mcast_address; int stats_sent; int stats_recv; int stats_delv; int stats_remcasts; int stats_orf_token; struct timeval stats_tv_start; struct totem_ip_address my_id; int firstrun; qb_loop_timer_handle timer_netif_check_timeout; unsigned int my_memb_entries; int flushing; struct totem_config *totem_config; totemsrp_stats_t *stats; struct totem_ip_address token_target; }; struct work_item { const void *msg; unsigned int msg_len; struct totemudp_instance *instance; }; static int totemudp_build_sockets ( struct totemudp_instance *instance, struct totem_ip_address *bindnet_address, struct totem_ip_address *mcastaddress, struct totemudp_socket *sockets, struct totem_ip_address *bound_to); static struct totem_ip_address localhost; static void totemudp_instance_initialize (struct totemudp_instance *instance) { memset (instance, 0, sizeof (struct totemudp_instance)); instance->netif_state_report = NETIF_STATE_REPORT_UP | NETIF_STATE_REPORT_DOWN; instance->totemudp_iov_recv.iov_base = instance->iov_buffer; instance->totemudp_iov_recv.iov_len = FRAME_SIZE_MAX; //sizeof (instance->iov_buffer); instance->totemudp_iov_recv_flush.iov_base = instance->iov_buffer_flush; instance->totemudp_iov_recv_flush.iov_len = FRAME_SIZE_MAX; //sizeof (instance->iov_buffer); /* * There is always atleast 1 processor */ instance->my_memb_entries = 1; } #define log_printf(level, format, args...) \ do { \ instance->totemudp_log_printf ( \ level, instance->totemudp_subsys_id, \ __FUNCTION__, __FILE__, __LINE__, \ (const char *)format, ##args); \ } while (0); #define LOGSYS_PERROR(err_num, level, fmt, args...) \ do { \ char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ instance->totemudp_log_printf ( \ level, instance->totemudp_subsys_id, \ __FUNCTION__, __FILE__, __LINE__, \ fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \ } while(0) int totemudp_crypto_set ( void *udp_context, const char *cipher_type, const char *hash_type) { return (0); } static inline void ucast_sendmsg ( struct totemudp_instance *instance, struct totem_ip_address *system_to, const void *msg, unsigned int msg_len) { struct msghdr msg_ucast; int res = 0; size_t buf_out_len; unsigned char buf_out[FRAME_SIZE_MAX]; struct sockaddr_storage sockaddr; struct iovec iovec; int addrlen; /* * Encrypt and digest the message */ if (crypto_encrypt_and_sign ( instance->crypto_inst, (const unsigned char *)msg, msg_len, buf_out, &buf_out_len) != 0) { log_printf(LOGSYS_LEVEL_CRIT, "Error encrypting/signing packet (non-critical)"); return; } iovec.iov_base = (void *)buf_out; iovec.iov_len = buf_out_len; /* * Build unicast message */ memset(&msg_ucast, 0, sizeof(msg_ucast)); totemip_totemip_to_sockaddr_convert(system_to, instance->totem_interface->ip_port, &sockaddr, &addrlen); msg_ucast.msg_name = &sockaddr; msg_ucast.msg_namelen = addrlen; msg_ucast.msg_iov = (void *)&iovec; msg_ucast.msg_iovlen = 1; #ifdef HAVE_MSGHDR_CONTROL msg_ucast.msg_control = 0; #endif #ifdef HAVE_MSGHDR_CONTROLLEN msg_ucast.msg_controllen = 0; #endif #ifdef HAVE_MSGHDR_FLAGS msg_ucast.msg_flags = 0; #endif #ifdef HAVE_MSGHDR_ACCRIGHTS msg_ucast.msg_accrights = NULL; #endif #ifdef HAVE_MSGHDR_ACCRIGHTSLEN msg_ucast.msg_accrightslen = 0; #endif /* * Transmit unicast message * An error here is recovered by totemsrp */ res = sendmsg (instance->totemudp_sockets.mcast_send, &msg_ucast, MSG_NOSIGNAL); if (res < 0) { LOGSYS_PERROR (errno, instance->totemudp_log_level_debug, "sendmsg(ucast) failed (non-critical)"); } } static inline void mcast_sendmsg ( struct totemudp_instance *instance, const void *msg, unsigned int msg_len) { struct msghdr msg_mcast; int res = 0; size_t buf_out_len; unsigned char buf_out[FRAME_SIZE_MAX]; struct iovec iovec; struct sockaddr_storage sockaddr; int addrlen; /* * Encrypt and digest the message */ if (crypto_encrypt_and_sign ( instance->crypto_inst, (const unsigned char *)msg, msg_len, buf_out, &buf_out_len) != 0) { log_printf(LOGSYS_LEVEL_CRIT, "Error encrypting/signing packet (non-critical)"); return; } iovec.iov_base = (void *)&buf_out; iovec.iov_len = buf_out_len; /* * Build multicast message */ totemip_totemip_to_sockaddr_convert(&instance->mcast_address, instance->totem_interface->ip_port, &sockaddr, &addrlen); memset(&msg_mcast, 0, sizeof(msg_mcast)); msg_mcast.msg_name = &sockaddr; msg_mcast.msg_namelen = addrlen; msg_mcast.msg_iov = (void *)&iovec; msg_mcast.msg_iovlen = 1; #ifdef HAVE_MSGHDR_CONTROL msg_mcast.msg_control = 0; #endif #ifdef HAVE_MSGHDR_CONTROLLEN msg_mcast.msg_controllen = 0; #endif #ifdef HAVE_MSGHDR_FLAGS msg_mcast.msg_flags = 0; #endif #ifdef HAVE_MSGHDR_ACCRIGHTS msg_mcast.msg_accrights = NULL; #endif #ifdef HAVE_MSGHDR_ACCRIGHTSLEN msg_mcast.msg_accrightslen = 0; #endif /* * Transmit multicast message * An error here is recovered by totemsrp */ res = sendmsg (instance->totemudp_sockets.mcast_send, &msg_mcast, MSG_NOSIGNAL); if (res < 0) { LOGSYS_PERROR (errno, instance->totemudp_log_level_debug, "sendmsg(mcast) failed (non-critical)"); instance->stats->continuous_sendmsg_failures++; } else { instance->stats->continuous_sendmsg_failures = 0; } /* * Transmit multicast message to local unix mcast loop * An error here is recovered by totemsrp */ msg_mcast.msg_name = NULL; msg_mcast.msg_namelen = 0; res = sendmsg (instance->totemudp_sockets.local_mcast_loop[1], &msg_mcast, MSG_NOSIGNAL); if (res < 0) { LOGSYS_PERROR (errno, instance->totemudp_log_level_debug, "sendmsg(local mcast loop) failed (non-critical)"); } } int totemudp_finalize ( void *udp_context) { struct totemudp_instance *instance = (struct totemudp_instance *)udp_context; int res = 0; if (instance->totemudp_sockets.mcast_recv > 0) { qb_loop_poll_del (instance->totemudp_poll_handle, instance->totemudp_sockets.mcast_recv); close (instance->totemudp_sockets.mcast_recv); } if (instance->totemudp_sockets.mcast_send > 0) { close (instance->totemudp_sockets.mcast_send); } if (instance->totemudp_sockets.local_mcast_loop[0] > 0) { qb_loop_poll_del (instance->totemudp_poll_handle, instance->totemudp_sockets.local_mcast_loop[0]); close (instance->totemudp_sockets.local_mcast_loop[0]); close (instance->totemudp_sockets.local_mcast_loop[1]); } if (instance->totemudp_sockets.token > 0) { qb_loop_poll_del (instance->totemudp_poll_handle, instance->totemudp_sockets.token); close (instance->totemudp_sockets.token); } return (res); } /* * Only designed to work with a message with one iov */ static int net_deliver_fn ( int fd, int revents, void *data) { struct totemudp_instance *instance = (struct totemudp_instance *)data; struct msghdr msg_recv; struct iovec *iovec; struct sockaddr_storage system_from; int bytes_received; int res = 0; char *message_type; if (instance->flushing == 1) { iovec = &instance->totemudp_iov_recv_flush; } else { iovec = &instance->totemudp_iov_recv; } /* * Receive datagram */ msg_recv.msg_name = &system_from; msg_recv.msg_namelen = sizeof (struct sockaddr_storage); msg_recv.msg_iov = iovec; msg_recv.msg_iovlen = 1; #ifdef HAVE_MSGHDR_CONTROL msg_recv.msg_control = 0; #endif #ifdef HAVE_MSGHDR_CONTROLLEN msg_recv.msg_controllen = 0; #endif #ifdef HAVE_MSGHDR_FLAGS msg_recv.msg_flags = 0; #endif #ifdef HAVE_MSGHDR_ACCRIGHTS msg_recv.msg_accrights = NULL; #endif #ifdef HAVE_MSGHDR_ACCRIGHTSLEN msg_recv.msg_accrightslen = 0; #endif bytes_received = recvmsg (fd, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT); if (bytes_received == -1) { return (0); } else { instance->stats_recv += bytes_received; } /* * Authenticate and if authenticated, decrypt datagram */ res = crypto_authenticate_and_decrypt (instance->crypto_inst, iovec->iov_base, &bytes_received); if (res == -1) { log_printf (instance->totemudp_log_level_security, "Received message has invalid digest... ignoring."); log_printf (instance->totemudp_log_level_security, "Invalid packet data"); iovec->iov_len = FRAME_SIZE_MAX; return 0; } iovec->iov_len = bytes_received; /* * Drop all non-mcast messages (more specifically join * messages should be dropped) */ message_type = (char *)iovec->iov_base; if (instance->flushing == 1 && *message_type == MESSAGE_TYPE_MEMB_JOIN) { iovec->iov_len = FRAME_SIZE_MAX; return (0); } /* * Handle incoming message */ instance->totemudp_deliver_fn ( instance->context, iovec->iov_base, iovec->iov_len); iovec->iov_len = FRAME_SIZE_MAX; return (0); } static int netif_determine ( struct totemudp_instance *instance, struct totem_ip_address *bindnet, struct totem_ip_address *bound_to, int *interface_up, int *interface_num) { int res; res = totemip_iface_check (bindnet, bound_to, interface_up, interface_num, instance->totem_config->clear_node_high_bit); return (res); } /* * If the interface is up, the sockets for totem are built. If the interface is down * this function is requeued in the timer list to retry building the sockets later. */ static void timer_function_netif_check_timeout ( void *data) { struct totemudp_instance *instance = (struct totemudp_instance *)data; int interface_up; int interface_num; struct totem_ip_address *bind_address; /* * Build sockets for every interface */ netif_determine (instance, &instance->totem_interface->bindnet, &instance->totem_interface->boundto, &interface_up, &interface_num); /* * If the network interface isn't back up and we are already * in loopback mode, add timer to check again and return */ if ((instance->netif_bind_state == BIND_STATE_LOOPBACK && interface_up == 0) || (instance->my_memb_entries == 1 && instance->netif_bind_state == BIND_STATE_REGULAR && interface_up == 1)) { qb_loop_timer_add (instance->totemudp_poll_handle, QB_LOOP_MED, instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); /* * Add a timer to check for a downed regular interface */ return; } if (instance->totemudp_sockets.mcast_recv > 0) { qb_loop_poll_del (instance->totemudp_poll_handle, instance->totemudp_sockets.mcast_recv); close (instance->totemudp_sockets.mcast_recv); } if (instance->totemudp_sockets.mcast_send > 0) { close (instance->totemudp_sockets.mcast_send); } if (instance->totemudp_sockets.local_mcast_loop[0] > 0) { qb_loop_poll_del (instance->totemudp_poll_handle, instance->totemudp_sockets.local_mcast_loop[0]); close (instance->totemudp_sockets.local_mcast_loop[0]); close (instance->totemudp_sockets.local_mcast_loop[1]); } if (instance->totemudp_sockets.token > 0) { qb_loop_poll_del (instance->totemudp_poll_handle, instance->totemudp_sockets.token); close (instance->totemudp_sockets.token); } if (interface_up == 0) { /* * Interface is not up */ instance->netif_bind_state = BIND_STATE_LOOPBACK; bind_address = &localhost; /* * Add a timer to retry building interfaces and request memb_gather_enter */ qb_loop_timer_add (instance->totemudp_poll_handle, QB_LOOP_MED, instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); } else { /* * Interface is up */ instance->netif_bind_state = BIND_STATE_REGULAR; bind_address = &instance->totem_interface->bindnet; } /* * Create and bind the multicast and unicast sockets */ (void)totemudp_build_sockets (instance, &instance->mcast_address, bind_address, &instance->totemudp_sockets, &instance->totem_interface->boundto); qb_loop_poll_add ( instance->totemudp_poll_handle, QB_LOOP_MED, instance->totemudp_sockets.mcast_recv, POLLIN, instance, net_deliver_fn); qb_loop_poll_add ( instance->totemudp_poll_handle, QB_LOOP_MED, instance->totemudp_sockets.local_mcast_loop[0], POLLIN, instance, net_deliver_fn); qb_loop_poll_add ( instance->totemudp_poll_handle, QB_LOOP_MED, instance->totemudp_sockets.token, POLLIN, instance, net_deliver_fn); totemip_copy (&instance->my_id, &instance->totem_interface->boundto); /* * This reports changes in the interface to the user and totemsrp */ if (instance->netif_bind_state == BIND_STATE_REGULAR) { if (instance->netif_state_report & NETIF_STATE_REPORT_UP) { log_printf (instance->totemudp_log_level_notice, "The network interface [%s] is now up.", totemip_print (&instance->totem_interface->boundto)); instance->netif_state_report = NETIF_STATE_REPORT_DOWN; instance->totemudp_iface_change_fn (instance->context, &instance->my_id); } /* * Add a timer to check for interface going down in single membership */ if (instance->my_memb_entries == 1) { qb_loop_timer_add (instance->totemudp_poll_handle, QB_LOOP_MED, instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); } } else { if (instance->netif_state_report & NETIF_STATE_REPORT_DOWN) { log_printf (instance->totemudp_log_level_notice, "The network interface is down."); instance->totemudp_iface_change_fn (instance->context, &instance->my_id); } instance->netif_state_report = NETIF_STATE_REPORT_UP; } } /* Set the socket priority to INTERACTIVE to ensure that our messages don't get queued behind anything else */ static void totemudp_traffic_control_set(struct totemudp_instance *instance, int sock) { #ifdef SO_PRIORITY int prio = 6; /* TC_PRIO_INTERACTIVE */ if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(int))) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "Could not set traffic priority"); } #endif } static int totemudp_build_sockets_ip ( struct totemudp_instance *instance, struct totem_ip_address *mcast_address, struct totem_ip_address *bindnet_address, struct totemudp_socket *sockets, struct totem_ip_address *bound_to, int interface_num) { struct sockaddr_storage sockaddr; struct ipv6_mreq mreq6; struct ip_mreq mreq; struct sockaddr_storage mcast_ss, boundto_ss; struct sockaddr_in6 *mcast_sin6 = (struct sockaddr_in6 *)&mcast_ss; struct sockaddr_in *mcast_sin = (struct sockaddr_in *)&mcast_ss; struct sockaddr_in *boundto_sin = (struct sockaddr_in *)&boundto_ss; unsigned int sendbuf_size; unsigned int recvbuf_size; unsigned int optlen = sizeof (sendbuf_size); int addrlen; int res; int flag; uint8_t sflag; int i; /* * Create multicast recv socket */ sockets->mcast_recv = socket (bindnet_address->family, SOCK_DGRAM, 0); if (sockets->mcast_recv == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "socket() failed"); return (-1); } totemip_nosigpipe (sockets->mcast_recv); res = fcntl (sockets->mcast_recv, F_SETFL, O_NONBLOCK); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "Could not set non-blocking operation on multicast socket"); return (-1); } /* * Force reuse */ flag = 1; if ( setsockopt(sockets->mcast_recv, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof (flag)) < 0) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "setsockopt(SO_REUSEADDR) failed"); return (-1); } /* * Bind to multicast socket used for multicast receives */ totemip_totemip_to_sockaddr_convert(mcast_address, instance->totem_interface->ip_port, &sockaddr, &addrlen); res = bind (sockets->mcast_recv, (struct sockaddr *)&sockaddr, addrlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "Unable to bind the socket to receive multicast packets"); return (-1); } /* * Create local multicast loop socket */ if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets->local_mcast_loop) == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "socket() failed"); return (-1); } for (i = 0; i < 2; i++) { totemip_nosigpipe (sockets->local_mcast_loop[i]); res = fcntl (sockets->local_mcast_loop[i], F_SETFL, O_NONBLOCK); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "Could not set non-blocking operation on multicast socket"); return (-1); } } /* * Setup mcast send socket */ sockets->mcast_send = socket (bindnet_address->family, SOCK_DGRAM, 0); if (sockets->mcast_send == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "socket() failed"); return (-1); } totemip_nosigpipe (sockets->mcast_send); res = fcntl (sockets->mcast_send, F_SETFL, O_NONBLOCK); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "Could not set non-blocking operation on multicast socket"); return (-1); } /* * Force reuse */ flag = 1; if ( setsockopt(sockets->mcast_send, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof (flag)) < 0) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "setsockopt(SO_REUSEADDR) failed"); return (-1); } totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port - 1, &sockaddr, &addrlen); res = bind (sockets->mcast_send, (struct sockaddr *)&sockaddr, addrlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "Unable to bind the socket to send multicast packets"); return (-1); } /* * Setup unicast socket */ sockets->token = socket (bindnet_address->family, SOCK_DGRAM, 0); if (sockets->token == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "socket() failed"); return (-1); } totemip_nosigpipe (sockets->token); res = fcntl (sockets->token, F_SETFL, O_NONBLOCK); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "Could not set non-blocking operation on token socket"); return (-1); } /* * Force reuse */ flag = 1; if ( setsockopt(sockets->token, SOL_SOCKET, SO_REUSEADDR, (char *)&flag, sizeof (flag)) < 0) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "setsockopt(SO_REUSEADDR) failed"); return (-1); } /* * Bind to unicast socket used for token send/receives * This has the side effect of binding to the correct interface */ totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port, &sockaddr, &addrlen); res = bind (sockets->token, (struct sockaddr *)&sockaddr, addrlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "Unable to bind UDP unicast socket"); return (-1); } recvbuf_size = MCAST_SOCKET_BUFFER_SIZE; sendbuf_size = MCAST_SOCKET_BUFFER_SIZE; /* * Set buffer sizes to avoid overruns */ res = setsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_debug, "Unable to set SO_RCVBUF size on UDP mcast socket"); return (-1); } res = setsockopt (sockets->mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_debug, "Unable to set SO_SNDBUF size on UDP mcast socket"); return (-1); } res = setsockopt (sockets->local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_debug, "Unable to set SO_RCVBUF size on UDP local mcast loop socket"); return (-1); } res = setsockopt (sockets->local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_debug, "Unable to set SO_SNDBUF size on UDP local mcast loop socket"); return (-1); } res = getsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen); if (res == 0) { log_printf (instance->totemudp_log_level_debug, "Receive multicast socket recv buffer size (%d bytes).", recvbuf_size); } res = getsockopt (sockets->mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen); if (res == 0) { log_printf (instance->totemudp_log_level_debug, "Transmit multicast socket send buffer size (%d bytes).", sendbuf_size); } res = getsockopt (sockets->local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen); if (res == 0) { log_printf (instance->totemudp_log_level_debug, "Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size); } res = getsockopt (sockets->local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen); if (res == 0) { log_printf (instance->totemudp_log_level_debug, "Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size); } /* * Join group membership on socket */ totemip_totemip_to_sockaddr_convert(mcast_address, instance->totem_interface->ip_port, &mcast_ss, &addrlen); totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port, &boundto_ss, &addrlen); if (instance->totem_config->broadcast_use == 1) { unsigned int broadcast = 1; if ((setsockopt(sockets->mcast_recv, SOL_SOCKET, SO_BROADCAST, &broadcast, sizeof (broadcast))) == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "setting broadcast option failed"); return (-1); } if ((setsockopt(sockets->mcast_send, SOL_SOCKET, SO_BROADCAST, &broadcast, sizeof (broadcast))) == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "setting broadcast option failed"); return (-1); } } else { switch (bindnet_address->family) { case AF_INET: memset(&mreq, 0, sizeof(mreq)); mreq.imr_multiaddr.s_addr = mcast_sin->sin_addr.s_addr; mreq.imr_interface.s_addr = boundto_sin->sin_addr.s_addr; res = setsockopt (sockets->mcast_recv, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof (mreq)); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "join ipv4 multicast group failed"); return (-1); } break; case AF_INET6: memset(&mreq6, 0, sizeof(mreq6)); memcpy(&mreq6.ipv6mr_multiaddr, &mcast_sin6->sin6_addr, sizeof(struct in6_addr)); mreq6.ipv6mr_interface = interface_num; res = setsockopt (sockets->mcast_recv, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mreq6, sizeof (mreq6)); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "join ipv6 multicast group failed"); return (-1); } break; } } /* * Turn off multicast loopback */ flag = 0; switch ( bindnet_address->family ) { case AF_INET: sflag = 0; res = setsockopt (sockets->mcast_send, IPPROTO_IP, IP_MULTICAST_LOOP, &sflag, sizeof (sflag)); break; case AF_INET6: res = setsockopt (sockets->mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &flag, sizeof (flag)); } if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "Unable to turn off multicast loopback"); return (-1); } /* * Set multicast packets TTL */ flag = instance->totem_interface->ttl; if (bindnet_address->family == AF_INET6) { res = setsockopt (sockets->mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &flag, sizeof (flag)); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "set mcast v6 TTL failed"); return (-1); } } else { sflag = flag; res = setsockopt(sockets->mcast_send, IPPROTO_IP, IP_MULTICAST_TTL, &sflag, sizeof(sflag)); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "set mcast v4 TTL failed"); return (-1); } } /* * Bind to a specific interface for multicast send and receive */ switch ( bindnet_address->family ) { case AF_INET: if (setsockopt (sockets->mcast_send, IPPROTO_IP, IP_MULTICAST_IF, &boundto_sin->sin_addr, sizeof (boundto_sin->sin_addr)) < 0) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "cannot select interface for multicast packets (send)"); return (-1); } if (setsockopt (sockets->mcast_recv, IPPROTO_IP, IP_MULTICAST_IF, &boundto_sin->sin_addr, sizeof (boundto_sin->sin_addr)) < 0) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "cannot select interface for multicast packets (recv)"); return (-1); } break; case AF_INET6: if (setsockopt (sockets->mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_IF, &interface_num, sizeof (interface_num)) < 0) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "cannot select interface for multicast packets (send v6)"); return (-1); } if (setsockopt (sockets->mcast_recv, IPPROTO_IPV6, IPV6_MULTICAST_IF, &interface_num, sizeof (interface_num)) < 0) { LOGSYS_PERROR (errno, instance->totemudp_log_level_warning, "cannot select interface for multicast packets (recv v6)"); return (-1); } break; } return 0; } static int totemudp_build_sockets ( struct totemudp_instance *instance, struct totem_ip_address *mcast_address, struct totem_ip_address *bindnet_address, struct totemudp_socket *sockets, struct totem_ip_address *bound_to) { int interface_num; int interface_up; int res; /* * Determine the ip address bound to and the interface name */ res = netif_determine (instance, bindnet_address, bound_to, &interface_up, &interface_num); if (res == -1) { return (-1); } totemip_copy(&instance->my_id, bound_to); res = totemudp_build_sockets_ip (instance, mcast_address, bindnet_address, sockets, bound_to, interface_num); /* We only send out of the token socket */ totemudp_traffic_control_set(instance, sockets->token); return res; } /* * Totem Network interface - also does encryption/decryption * depends on poll abstraction, POSIX, IPV4 */ /* * Create an instance */ int totemudp_initialize ( qb_loop_t *poll_handle, void **udp_context, struct totem_config *totem_config, totemsrp_stats_t *stats, int interface_no, void *context, void (*deliver_fn) ( void *context, const void *msg, unsigned int msg_len), void (*iface_change_fn) ( void *context, const struct totem_ip_address *iface_address), void (*target_set_completed) ( void *context)) { struct totemudp_instance *instance; instance = malloc (sizeof (struct totemudp_instance)); if (instance == NULL) { return (-1); } totemudp_instance_initialize (instance); instance->totem_config = totem_config; instance->stats = stats; /* * Configure logging */ instance->totemudp_log_level_security = 1; //totem_config->totem_logging_configuration.log_level_security; instance->totemudp_log_level_error = totem_config->totem_logging_configuration.log_level_error; instance->totemudp_log_level_warning = totem_config->totem_logging_configuration.log_level_warning; instance->totemudp_log_level_notice = totem_config->totem_logging_configuration.log_level_notice; instance->totemudp_log_level_debug = totem_config->totem_logging_configuration.log_level_debug; instance->totemudp_subsys_id = totem_config->totem_logging_configuration.log_subsys_id; instance->totemudp_log_printf = totem_config->totem_logging_configuration.log_printf; /* * Initialize random number generator for later use to generate salt */ instance->crypto_inst = crypto_init (totem_config->private_key, totem_config->private_key_len, totem_config->crypto_cipher_type, totem_config->crypto_hash_type, instance->totemudp_log_printf, instance->totemudp_log_level_security, instance->totemudp_log_level_notice, instance->totemudp_log_level_error, instance->totemudp_subsys_id); if (instance->crypto_inst == NULL) { free(instance); return (-1); } /* * Initialize local variables for totemudp */ instance->totem_interface = &totem_config->interfaces[interface_no]; totemip_copy (&instance->mcast_address, &instance->totem_interface->mcast_addr); memset (instance->iov_buffer, 0, FRAME_SIZE_MAX); instance->totemudp_poll_handle = poll_handle; instance->totem_interface->bindnet.nodeid = instance->totem_config->node_id; instance->context = context; instance->totemudp_deliver_fn = deliver_fn; instance->totemudp_iface_change_fn = iface_change_fn; instance->totemudp_target_set_completed = target_set_completed; totemip_localhost (instance->mcast_address.family, &localhost); localhost.nodeid = instance->totem_config->node_id; /* * RRP layer isn't ready to receive message because it hasn't * initialized yet. Add short timer to check the interfaces. */ qb_loop_timer_add (instance->totemudp_poll_handle, QB_LOOP_MED, 100*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); *udp_context = instance; return (0); } void *totemudp_buffer_alloc (void) { return malloc (FRAME_SIZE_MAX); } void totemudp_buffer_release (void *ptr) { return free (ptr); } int totemudp_processor_count_set ( void *udp_context, int processor_count) { struct totemudp_instance *instance = (struct totemudp_instance *)udp_context; int res = 0; instance->my_memb_entries = processor_count; qb_loop_timer_del (instance->totemudp_poll_handle, instance->timer_netif_check_timeout); if (processor_count == 1) { qb_loop_timer_add (instance->totemudp_poll_handle, QB_LOOP_MED, instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); } return (res); } int totemudp_recv_flush (void *udp_context) { struct totemudp_instance *instance = (struct totemudp_instance *)udp_context; struct pollfd ufd; int nfds; int res = 0; int i; int sock; instance->flushing = 1; for (i = 0; i < 2; i++) { sock = -1; if (i == 0) { sock = instance->totemudp_sockets.mcast_recv; } if (i == 1) { sock = instance->totemudp_sockets.local_mcast_loop[0]; } assert(sock != -1); do { ufd.fd = sock; ufd.events = POLLIN; nfds = poll (&ufd, 1, 0); if (nfds == 1 && ufd.revents & POLLIN) { net_deliver_fn (sock, ufd.revents, instance); } } while (nfds == 1); } instance->flushing = 0; return (res); } int totemudp_send_flush (void *udp_context) { return 0; } int totemudp_token_send ( void *udp_context, const void *msg, unsigned int msg_len) { struct totemudp_instance *instance = (struct totemudp_instance *)udp_context; int res = 0; ucast_sendmsg (instance, &instance->token_target, msg, msg_len); return (res); } int totemudp_mcast_flush_send ( void *udp_context, const void *msg, unsigned int msg_len) { struct totemudp_instance *instance = (struct totemudp_instance *)udp_context; int res = 0; mcast_sendmsg (instance, msg, msg_len); return (res); } int totemudp_mcast_noflush_send ( void *udp_context, const void *msg, unsigned int msg_len) { struct totemudp_instance *instance = (struct totemudp_instance *)udp_context; int res = 0; mcast_sendmsg (instance, msg, msg_len); return (res); } extern int totemudp_iface_check (void *udp_context) { struct totemudp_instance *instance = (struct totemudp_instance *)udp_context; int res = 0; timer_function_netif_check_timeout (instance); return (res); } extern void totemudp_net_mtu_adjust (void *udp_context, struct totem_config *totem_config) { -#define UDPIP_HEADER_SIZE (20 + 8) /* 20 bytes for ip 8 bytes for udp */ + + assert(totem_config->interface_count > 0); + totem_config->net_mtu -= crypto_sec_header_size(totem_config->crypto_cipher_type, totem_config->crypto_hash_type) + - UDPIP_HEADER_SIZE; + totemip_udpip_header_size(totem_config->interfaces[0].bindnet.family); } const char *totemudp_iface_print (void *udp_context) { struct totemudp_instance *instance = (struct totemudp_instance *)udp_context; const char *ret_char; ret_char = totemip_print (&instance->my_id); return (ret_char); } int totemudp_iface_get ( void *udp_context, struct totem_ip_address *addr) { struct totemudp_instance *instance = (struct totemudp_instance *)udp_context; int res = 0; memcpy (addr, &instance->my_id, sizeof (struct totem_ip_address)); return (res); } int totemudp_token_target_set ( void *udp_context, const struct totem_ip_address *token_target) { struct totemudp_instance *instance = (struct totemudp_instance *)udp_context; int res = 0; memcpy (&instance->token_target, token_target, sizeof (struct totem_ip_address)); instance->totemudp_target_set_completed (instance->context); return (res); } extern int totemudp_recv_mcast_empty ( void *udp_context) { struct totemudp_instance *instance = (struct totemudp_instance *)udp_context; unsigned int res; struct sockaddr_storage system_from; struct msghdr msg_recv; struct pollfd ufd; int nfds; int msg_processed = 0; int i; int sock; /* * Receive datagram */ msg_recv.msg_name = &system_from; msg_recv.msg_namelen = sizeof (struct sockaddr_storage); msg_recv.msg_iov = &instance->totemudp_iov_recv_flush; msg_recv.msg_iovlen = 1; #ifdef HAVE_MSGHDR_CONTROL msg_recv.msg_control = 0; #endif #ifdef HAVE_MSGHDR_CONTROLLEN msg_recv.msg_controllen = 0; #endif #ifdef HAVE_MSGHDR_FLAGS msg_recv.msg_flags = 0; #endif #ifdef HAVE_MSGHDR_ACCRIGHTS msg_recv.msg_accrights = NULL; #endif #ifdef HAVE_MSGHDR_ACCRIGHTSLEN msg_recv.msg_accrightslen = 0; #endif for (i = 0; i < 2; i++) { sock = -1; if (i == 0) { sock = instance->totemudp_sockets.mcast_recv; } if (i == 1) { sock = instance->totemudp_sockets.local_mcast_loop[0]; } assert(sock != -1); do { ufd.fd = sock; ufd.events = POLLIN; nfds = poll (&ufd, 1, 0); if (nfds == 1 && ufd.revents & POLLIN) { res = recvmsg (sock, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT); if (res != -1) { msg_processed = 1; } else { msg_processed = -1; } } } while (nfds == 1); } return (msg_processed); } diff --git a/exec/totemudpu.c b/exec/totemudpu.c index 69837c77..037f82b4 100644 --- a/exec/totemudpu.c +++ b/exec/totemudpu.c @@ -1,1284 +1,1286 @@ /* * Copyright (c) 2005 MontaVista Software, Inc. * Copyright (c) 2006-2012 Red Hat, Inc. * * All rights reserved. * * Author: Steven Dake (sdake@redhat.com) * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define LOGSYS_UTILS_ONLY 1 #include #include "totemudpu.h" #include "util.h" #include "totemcrypto.h" #include #include #include #include #ifndef MSG_NOSIGNAL #define MSG_NOSIGNAL 0 #endif #define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * FRAME_SIZE_MAX) #define NETIF_STATE_REPORT_UP 1 #define NETIF_STATE_REPORT_DOWN 2 #define BIND_STATE_UNBOUND 0 #define BIND_STATE_REGULAR 1 #define BIND_STATE_LOOPBACK 2 struct totemudpu_member { struct list_head list; struct totem_ip_address member; int fd; int active; }; struct totemudpu_instance { struct crypto_instance *crypto_inst; qb_loop_t *totemudpu_poll_handle; struct totem_interface *totem_interface; int netif_state_report; int netif_bind_state; void *context; void (*totemudpu_deliver_fn) ( void *context, const void *msg, unsigned int msg_len); void (*totemudpu_iface_change_fn) ( void *context, const struct totem_ip_address *iface_address); void (*totemudpu_target_set_completed) (void *context); /* * Function and data used to log messages */ int totemudpu_log_level_security; int totemudpu_log_level_error; int totemudpu_log_level_warning; int totemudpu_log_level_notice; int totemudpu_log_level_debug; int totemudpu_subsys_id; void (*totemudpu_log_printf) ( int level, int subsys, const char *function, const char *file, int line, const char *format, ...)__attribute__((format(printf, 6, 7))); void *udpu_context; char iov_buffer[FRAME_SIZE_MAX]; struct iovec totemudpu_iov_recv; struct list_head member_list; int stats_sent; int stats_recv; int stats_delv; int stats_remcasts; int stats_orf_token; struct timeval stats_tv_start; struct totem_ip_address my_id; int firstrun; qb_loop_timer_handle timer_netif_check_timeout; unsigned int my_memb_entries; struct totem_config *totem_config; totemsrp_stats_t *stats; struct totem_ip_address token_target; int token_socket; qb_loop_timer_handle timer_merge_detect_timeout; int send_merge_detect_message; unsigned int merge_detect_messages_sent_before_timeout; }; struct work_item { const void *msg; unsigned int msg_len; struct totemudpu_instance *instance; }; static int totemudpu_build_sockets ( struct totemudpu_instance *instance, struct totem_ip_address *bindnet_address, struct totem_ip_address *bound_to); static int totemudpu_create_sending_socket( void *udpu_context, const struct totem_ip_address *member); int totemudpu_member_list_rebind_ip ( void *udpu_context); static void totemudpu_start_merge_detect_timeout( void *udpu_context); static void totemudpu_stop_merge_detect_timeout( void *udpu_context); static struct totem_ip_address localhost; static void totemudpu_instance_initialize (struct totemudpu_instance *instance) { memset (instance, 0, sizeof (struct totemudpu_instance)); instance->netif_state_report = NETIF_STATE_REPORT_UP | NETIF_STATE_REPORT_DOWN; instance->totemudpu_iov_recv.iov_base = instance->iov_buffer; instance->totemudpu_iov_recv.iov_len = FRAME_SIZE_MAX; //sizeof (instance->iov_buffer); /* * There is always atleast 1 processor */ instance->my_memb_entries = 1; list_init (&instance->member_list); } #define log_printf(level, format, args...) \ do { \ instance->totemudpu_log_printf ( \ level, instance->totemudpu_subsys_id, \ __FUNCTION__, __FILE__, __LINE__, \ (const char *)format, ##args); \ } while (0); #define LOGSYS_PERROR(err_num, level, fmt, args...) \ do { \ char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ instance->totemudpu_log_printf ( \ level, instance->totemudpu_subsys_id, \ __FUNCTION__, __FILE__, __LINE__, \ fmt ": %s (%d)", ##args, _error_ptr, err_num); \ } while(0) int totemudpu_crypto_set ( void *udpu_context, const char *cipher_type, const char *hash_type) { return (0); } static inline void ucast_sendmsg ( struct totemudpu_instance *instance, struct totem_ip_address *system_to, const void *msg, unsigned int msg_len) { struct msghdr msg_ucast; int res = 0; size_t buf_out_len; unsigned char buf_out[FRAME_SIZE_MAX]; struct sockaddr_storage sockaddr; struct iovec iovec; int addrlen; /* * Encrypt and digest the message */ if (crypto_encrypt_and_sign ( instance->crypto_inst, (const unsigned char *)msg, msg_len, buf_out, &buf_out_len) != 0) { log_printf(LOGSYS_LEVEL_CRIT, "Error encrypting/signing packet (non-critical)"); return; } iovec.iov_base = (void *)buf_out; iovec.iov_len = buf_out_len; /* * Build unicast message */ totemip_totemip_to_sockaddr_convert(system_to, instance->totem_interface->ip_port, &sockaddr, &addrlen); memset(&msg_ucast, 0, sizeof(msg_ucast)); msg_ucast.msg_name = &sockaddr; msg_ucast.msg_namelen = addrlen; msg_ucast.msg_iov = (void *)&iovec; msg_ucast.msg_iovlen = 1; #ifdef HAVE_MSGHDR_CONTROL msg_ucast.msg_control = 0; #endif #ifdef HAVE_MSGHDR_CONTROLLEN msg_ucast.msg_controllen = 0; #endif #ifdef HAVE_MSGHDR_FLAGS msg_ucast.msg_flags = 0; #endif #ifdef HAVE_MSGHDR_ACCRIGHTS msg_ucast.msg_accrights = NULL; #endif #ifdef HAVE_MSGHDR_ACCRIGHTSLEN msg_ucast.msg_accrightslen = 0; #endif /* * Transmit unicast message * An error here is recovered by totemsrp */ res = sendmsg (instance->token_socket, &msg_ucast, MSG_NOSIGNAL); if (res < 0) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, "sendmsg(ucast) failed (non-critical)"); } } static inline void mcast_sendmsg ( struct totemudpu_instance *instance, const void *msg, unsigned int msg_len, int only_active) { struct msghdr msg_mcast; int res = 0; size_t buf_out_len; unsigned char buf_out[FRAME_SIZE_MAX]; struct iovec iovec; struct sockaddr_storage sockaddr; int addrlen; struct list_head *list; struct totemudpu_member *member; /* * Encrypt and digest the message */ if (crypto_encrypt_and_sign ( instance->crypto_inst, (const unsigned char *)msg, msg_len, buf_out, &buf_out_len) != 0) { log_printf(LOGSYS_LEVEL_CRIT, "Error encrypting/signing packet (non-critical)"); return; } iovec.iov_base = (void *)buf_out; iovec.iov_len = buf_out_len; memset(&msg_mcast, 0, sizeof(msg_mcast)); /* * Build multicast message */ for (list = instance->member_list.next; list != &instance->member_list; list = list->next) { member = list_entry (list, struct totemudpu_member, list); /* * Do not send multicast message if message is not "flush", member * is inactive and timeout for sending merge message didn't expired. */ if (only_active && !member->active && !instance->send_merge_detect_message) continue ; totemip_totemip_to_sockaddr_convert(&member->member, instance->totem_interface->ip_port, &sockaddr, &addrlen); msg_mcast.msg_name = &sockaddr; msg_mcast.msg_namelen = addrlen; msg_mcast.msg_iov = (void *)&iovec; msg_mcast.msg_iovlen = 1; #ifdef HAVE_MSGHDR_CONTROL msg_mcast.msg_control = 0; #endif #ifdef HAVE_MSGHDR_CONTROLLEN msg_mcast.msg_controllen = 0; #endif #ifdef HAVE_MSGHDR_FLAGS msg_mcast.msg_flags = 0; #endif #ifdef HAVE_MSGHDR_ACCRIGHTS msg_mcast.msg_accrights = NULL; #endif #ifdef HAVE_MSGHDR_ACCRIGHTSLEN msg_mcast.msg_accrightslen = 0; #endif /* * Transmit multicast message * An error here is recovered by totemsrp */ res = sendmsg (member->fd, &msg_mcast, MSG_NOSIGNAL); if (res < 0) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_debug, "sendmsg(mcast) failed (non-critical)"); } } if (!only_active || instance->send_merge_detect_message) { /* * Current message was sent to all nodes */ instance->merge_detect_messages_sent_before_timeout++; instance->send_merge_detect_message = 0; } } int totemudpu_finalize ( void *udpu_context) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; if (instance->token_socket > 0) { qb_loop_poll_del (instance->totemudpu_poll_handle, instance->token_socket); close (instance->token_socket); } totemudpu_stop_merge_detect_timeout(instance); return (res); } static int net_deliver_fn ( int fd, int revents, void *data) { struct totemudpu_instance *instance = (struct totemudpu_instance *)data; struct msghdr msg_recv; struct iovec *iovec; struct sockaddr_storage system_from; int bytes_received; int res = 0; iovec = &instance->totemudpu_iov_recv; /* * Receive datagram */ msg_recv.msg_name = &system_from; msg_recv.msg_namelen = sizeof (struct sockaddr_storage); msg_recv.msg_iov = iovec; msg_recv.msg_iovlen = 1; #ifdef HAVE_MSGHDR_CONTROL msg_recv.msg_control = 0; #endif #ifdef HAVE_MSGHDR_CONTROLLEN msg_recv.msg_controllen = 0; #endif #ifdef HAVE_MSGHDR_FLAGS msg_recv.msg_flags = 0; #endif #ifdef HAVE_MSGHDR_ACCRIGHTS msg_recv.msg_accrights = NULL; #endif #ifdef HAVE_MSGHDR_ACCRIGHTSLEN msg_recv.msg_accrightslen = 0; #endif bytes_received = recvmsg (fd, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT); if (bytes_received == -1) { return (0); } else { instance->stats_recv += bytes_received; } /* * Authenticate and if authenticated, decrypt datagram */ res = crypto_authenticate_and_decrypt (instance->crypto_inst, iovec->iov_base, &bytes_received); if (res == -1) { log_printf (instance->totemudpu_log_level_security, "Received message has invalid digest... ignoring."); log_printf (instance->totemudpu_log_level_security, "Invalid packet data"); iovec->iov_len = FRAME_SIZE_MAX; return 0; } iovec->iov_len = bytes_received; /* * Handle incoming message */ instance->totemudpu_deliver_fn ( instance->context, iovec->iov_base, iovec->iov_len); iovec->iov_len = FRAME_SIZE_MAX; return (0); } static int netif_determine ( struct totemudpu_instance *instance, struct totem_ip_address *bindnet, struct totem_ip_address *bound_to, int *interface_up, int *interface_num) { int res; res = totemip_iface_check (bindnet, bound_to, interface_up, interface_num, instance->totem_config->clear_node_high_bit); return (res); } /* * If the interface is up, the sockets for totem are built. If the interface is down * this function is requeued in the timer list to retry building the sockets later. */ static void timer_function_netif_check_timeout ( void *data) { struct totemudpu_instance *instance = (struct totemudpu_instance *)data; int interface_up; int interface_num; struct totem_ip_address *bind_address; /* * Build sockets for every interface */ netif_determine (instance, &instance->totem_interface->bindnet, &instance->totem_interface->boundto, &interface_up, &interface_num); /* * If the network interface isn't back up and we are already * in loopback mode, add timer to check again and return */ if ((instance->netif_bind_state == BIND_STATE_LOOPBACK && interface_up == 0) || (instance->my_memb_entries == 1 && instance->netif_bind_state == BIND_STATE_REGULAR && interface_up == 1)) { qb_loop_timer_add (instance->totemudpu_poll_handle, QB_LOOP_MED, instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); /* * Add a timer to check for a downed regular interface */ return; } if (instance->token_socket > 0) { qb_loop_poll_del (instance->totemudpu_poll_handle, instance->token_socket); close (instance->token_socket); } if (interface_up == 0) { /* * Interface is not up */ instance->netif_bind_state = BIND_STATE_LOOPBACK; bind_address = &localhost; /* * Add a timer to retry building interfaces and request memb_gather_enter */ qb_loop_timer_add (instance->totemudpu_poll_handle, QB_LOOP_MED, instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); } else { /* * Interface is up */ instance->netif_bind_state = BIND_STATE_REGULAR; bind_address = &instance->totem_interface->bindnet; } /* * Create and bind the multicast and unicast sockets */ totemudpu_build_sockets (instance, bind_address, &instance->totem_interface->boundto); qb_loop_poll_add (instance->totemudpu_poll_handle, QB_LOOP_MED, instance->token_socket, POLLIN, instance, net_deliver_fn); totemip_copy (&instance->my_id, &instance->totem_interface->boundto); /* * This reports changes in the interface to the user and totemsrp */ if (instance->netif_bind_state == BIND_STATE_REGULAR) { if (instance->netif_state_report & NETIF_STATE_REPORT_UP) { log_printf (instance->totemudpu_log_level_notice, "The network interface [%s] is now up.", totemip_print (&instance->totem_interface->boundto)); instance->netif_state_report = NETIF_STATE_REPORT_DOWN; instance->totemudpu_iface_change_fn (instance->context, &instance->my_id); } /* * Add a timer to check for interface going down in single membership */ if (instance->my_memb_entries == 1) { qb_loop_timer_add (instance->totemudpu_poll_handle, QB_LOOP_MED, instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); } } else { if (instance->netif_state_report & NETIF_STATE_REPORT_DOWN) { log_printf (instance->totemudpu_log_level_notice, "The network interface is down."); instance->totemudpu_iface_change_fn (instance->context, &instance->my_id); } instance->netif_state_report = NETIF_STATE_REPORT_UP; } } /* Set the socket priority to INTERACTIVE to ensure that our messages don't get queued behind anything else */ static void totemudpu_traffic_control_set(struct totemudpu_instance *instance, int sock) { #ifdef SO_PRIORITY int prio = 6; /* TC_PRIO_INTERACTIVE */ if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(int))) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, "Could not set traffic priority"); } #endif } static int totemudpu_build_sockets_ip ( struct totemudpu_instance *instance, struct totem_ip_address *bindnet_address, struct totem_ip_address *bound_to, int interface_num) { struct sockaddr_storage sockaddr; int addrlen; int res; unsigned int recvbuf_size; unsigned int optlen = sizeof (recvbuf_size); /* * Setup unicast socket */ instance->token_socket = socket (bindnet_address->family, SOCK_DGRAM, 0); if (instance->token_socket == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, "socket() failed"); return (-1); } totemip_nosigpipe (instance->token_socket); res = fcntl (instance->token_socket, F_SETFL, O_NONBLOCK); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, "Could not set non-blocking operation on token socket"); return (-1); } /* * Bind to unicast socket used for token send/receives * This has the side effect of binding to the correct interface */ totemip_totemip_to_sockaddr_convert(bound_to, instance->totem_interface->ip_port, &sockaddr, &addrlen); res = bind (instance->token_socket, (struct sockaddr *)&sockaddr, addrlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, "bind token socket failed"); return (-1); } /* * the token_socket can receive many messages. Allow a large number * of receive messages on this socket */ recvbuf_size = MCAST_SOCKET_BUFFER_SIZE; res = setsockopt (instance->token_socket, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_notice, "Could not set recvbuf size"); } return 0; } static int totemudpu_build_sockets ( struct totemudpu_instance *instance, struct totem_ip_address *bindnet_address, struct totem_ip_address *bound_to) { int interface_num; int interface_up; int res; /* * Determine the ip address bound to and the interface name */ res = netif_determine (instance, bindnet_address, bound_to, &interface_up, &interface_num); if (res == -1) { return (-1); } totemip_copy(&instance->my_id, bound_to); res = totemudpu_build_sockets_ip (instance, bindnet_address, bound_to, interface_num); /* We only send out of the token socket */ totemudpu_traffic_control_set(instance, instance->token_socket); /* * Rebind all members to new ips */ totemudpu_member_list_rebind_ip(instance); return res; } /* * Totem Network interface - also does encryption/decryption * depends on poll abstraction, POSIX, IPV4 */ /* * Create an instance */ int totemudpu_initialize ( qb_loop_t *poll_handle, void **udpu_context, struct totem_config *totem_config, totemsrp_stats_t *stats, int interface_no, void *context, void (*deliver_fn) ( void *context, const void *msg, unsigned int msg_len), void (*iface_change_fn) ( void *context, const struct totem_ip_address *iface_address), void (*target_set_completed) ( void *context)) { struct totemudpu_instance *instance; instance = malloc (sizeof (struct totemudpu_instance)); if (instance == NULL) { return (-1); } totemudpu_instance_initialize (instance); instance->totem_config = totem_config; instance->stats = stats; /* * Configure logging */ instance->totemudpu_log_level_security = 1; //totem_config->totem_logging_configuration.log_level_security; instance->totemudpu_log_level_error = totem_config->totem_logging_configuration.log_level_error; instance->totemudpu_log_level_warning = totem_config->totem_logging_configuration.log_level_warning; instance->totemudpu_log_level_notice = totem_config->totem_logging_configuration.log_level_notice; instance->totemudpu_log_level_debug = totem_config->totem_logging_configuration.log_level_debug; instance->totemudpu_subsys_id = totem_config->totem_logging_configuration.log_subsys_id; instance->totemudpu_log_printf = totem_config->totem_logging_configuration.log_printf; /* * Initialize random number generator for later use to generate salt */ instance->crypto_inst = crypto_init (totem_config->private_key, totem_config->private_key_len, totem_config->crypto_cipher_type, totem_config->crypto_hash_type, instance->totemudpu_log_printf, instance->totemudpu_log_level_security, instance->totemudpu_log_level_notice, instance->totemudpu_log_level_error, instance->totemudpu_subsys_id); if (instance->crypto_inst == NULL) { free(instance); return (-1); } /* * Initialize local variables for totemudpu */ instance->totem_interface = &totem_config->interfaces[interface_no]; memset (instance->iov_buffer, 0, FRAME_SIZE_MAX); instance->totemudpu_poll_handle = poll_handle; instance->totem_interface->bindnet.nodeid = instance->totem_config->node_id; instance->context = context; instance->totemudpu_deliver_fn = deliver_fn; instance->totemudpu_iface_change_fn = iface_change_fn; instance->totemudpu_target_set_completed = target_set_completed; totemip_localhost (AF_INET, &localhost); localhost.nodeid = instance->totem_config->node_id; /* * RRP layer isn't ready to receive message because it hasn't * initialized yet. Add short timer to check the interfaces. */ qb_loop_timer_add (instance->totemudpu_poll_handle, QB_LOOP_MED, 100*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); totemudpu_start_merge_detect_timeout(instance); *udpu_context = instance; return (0); } void *totemudpu_buffer_alloc (void) { return malloc (FRAME_SIZE_MAX); } void totemudpu_buffer_release (void *ptr) { return free (ptr); } int totemudpu_processor_count_set ( void *udpu_context, int processor_count) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; instance->my_memb_entries = processor_count; qb_loop_timer_del (instance->totemudpu_poll_handle, instance->timer_netif_check_timeout); if (processor_count == 1) { qb_loop_timer_add (instance->totemudpu_poll_handle, QB_LOOP_MED, instance->totem_config->downcheck_timeout*QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_netif_check_timeout, &instance->timer_netif_check_timeout); } return (res); } int totemudpu_recv_flush (void *udpu_context) { int res = 0; return (res); } int totemudpu_send_flush (void *udpu_context) { int res = 0; return (res); } int totemudpu_token_send ( void *udpu_context, const void *msg, unsigned int msg_len) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; ucast_sendmsg (instance, &instance->token_target, msg, msg_len); return (res); } int totemudpu_mcast_flush_send ( void *udpu_context, const void *msg, unsigned int msg_len) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; mcast_sendmsg (instance, msg, msg_len, 0); return (res); } int totemudpu_mcast_noflush_send ( void *udpu_context, const void *msg, unsigned int msg_len) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; mcast_sendmsg (instance, msg, msg_len, 1); return (res); } extern int totemudpu_iface_check (void *udpu_context) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; timer_function_netif_check_timeout (instance); return (res); } extern void totemudpu_net_mtu_adjust (void *udpu_context, struct totem_config *totem_config) { -#define UDPIP_HEADER_SIZE (20 + 8) /* 20 bytes for ip 8 bytes for udp */ + + assert(totem_config->interface_count > 0); + totem_config->net_mtu -= crypto_sec_header_size(totem_config->crypto_cipher_type, totem_config->crypto_hash_type) + - UDPIP_HEADER_SIZE; + totemip_udpip_header_size(totem_config->interfaces[0].bindnet.family); } const char *totemudpu_iface_print (void *udpu_context) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; const char *ret_char; ret_char = totemip_print (&instance->my_id); return (ret_char); } int totemudpu_iface_get ( void *udpu_context, struct totem_ip_address *addr) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; memcpy (addr, &instance->my_id, sizeof (struct totem_ip_address)); return (res); } int totemudpu_token_target_set ( void *udpu_context, const struct totem_ip_address *token_target) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int res = 0; memcpy (&instance->token_target, token_target, sizeof (struct totem_ip_address)); instance->totemudpu_target_set_completed (instance->context); return (res); } extern int totemudpu_recv_mcast_empty ( void *udpu_context) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; unsigned int res; struct sockaddr_storage system_from; struct msghdr msg_recv; struct pollfd ufd; int nfds; int msg_processed = 0; /* * Receive datagram */ msg_recv.msg_name = &system_from; msg_recv.msg_namelen = sizeof (struct sockaddr_storage); msg_recv.msg_iov = &instance->totemudpu_iov_recv; msg_recv.msg_iovlen = 1; #ifdef HAVE_MSGHDR_CONTROL msg_recv.msg_control = 0; #endif #ifdef HAVE_MSGHDR_CONTROLLEN msg_recv.msg_controllen = 0; #endif #ifdef HAVE_MSGHDR_FLAGS msg_recv.msg_flags = 0; #endif #ifdef HAVE_MSGHDR_ACCRIGHTS msg_recv.msg_accrights = NULL; #endif #ifdef HAVE_MSGHDR_ACCRIGHTSLEN msg_recv.msg_accrightslen = 0; #endif do { ufd.fd = instance->token_socket; ufd.events = POLLIN; nfds = poll (&ufd, 1, 0); if (nfds == 1 && ufd.revents & POLLIN) { res = recvmsg (instance->token_socket, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT); if (res != -1) { msg_processed = 1; } else { msg_processed = -1; } } } while (nfds == 1); return (msg_processed); } static int totemudpu_create_sending_socket( void *udpu_context, const struct totem_ip_address *member) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; int fd; int res; unsigned int sendbuf_size; unsigned int optlen = sizeof (sendbuf_size); struct sockaddr_storage sockaddr; int addrlen; fd = socket (member->family, SOCK_DGRAM, 0); if (fd == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, "Could not create socket for new member"); return (-1); } totemip_nosigpipe (fd); res = fcntl (fd, F_SETFL, O_NONBLOCK); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, "Could not set non-blocking operation on token socket"); goto error_close_fd; } /* * These sockets are used to send multicast messages, so their buffers * should be large */ sendbuf_size = MCAST_SOCKET_BUFFER_SIZE; res = setsockopt (fd, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_notice, "Could not set sendbuf size"); /* * Fail in setting sendbuf size is not fatal -> don't exit */ } /* * Bind to sending interface */ totemip_totemip_to_sockaddr_convert(&instance->my_id, 0, &sockaddr, &addrlen); res = bind (fd, (struct sockaddr *)&sockaddr, addrlen); if (res == -1) { LOGSYS_PERROR (errno, instance->totemudpu_log_level_warning, "bind token socket failed"); goto error_close_fd; } return (fd); error_close_fd: close(fd); return (-1); } int totemudpu_member_add ( void *udpu_context, const struct totem_ip_address *member) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; struct totemudpu_member *new_member; new_member = malloc (sizeof (struct totemudpu_member)); if (new_member == NULL) { return (-1); } memset(new_member, 0, sizeof(*new_member)); log_printf (LOGSYS_LEVEL_NOTICE, "adding new UDPU member {%s}", totemip_print(member)); list_init (&new_member->list); list_add_tail (&new_member->list, &instance->member_list); memcpy (&new_member->member, member, sizeof (struct totem_ip_address)); new_member->fd = totemudpu_create_sending_socket(udpu_context, member); new_member->active = 0; return (0); } int totemudpu_member_remove ( void *udpu_context, const struct totem_ip_address *token_target) { int found = 0; struct list_head *list; struct totemudpu_member *member; struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; /* * Find the member to remove and close its socket */ for (list = instance->member_list.next; list != &instance->member_list; list = list->next) { member = list_entry (list, struct totemudpu_member, list); if (totemip_compare (token_target, &member->member)==0) { log_printf(LOGSYS_LEVEL_NOTICE, "removing UDPU member {%s}", totemip_print(&member->member)); if (member->fd > 0) { log_printf(LOGSYS_LEVEL_DEBUG, "Closing socket to: {%s}", totemip_print(&member->member)); qb_loop_poll_del (instance->totemudpu_poll_handle, member->fd); close (member->fd); } found = 1; break; } } /* * Delete the member from the list */ if (found) { list_del (list); } instance = NULL; return (0); } int totemudpu_member_list_rebind_ip ( void *udpu_context) { struct list_head *list; struct totemudpu_member *member; struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; for (list = instance->member_list.next; list != &instance->member_list; list = list->next) { member = list_entry (list, struct totemudpu_member, list); if (member->fd > 0) { close (member->fd); } member->fd = totemudpu_create_sending_socket(udpu_context, &member->member); } return (0); } int totemudpu_member_set_active ( void *udpu_context, const struct totem_ip_address *member_ip, int active) { struct list_head *list; struct totemudpu_member *member; int addr_found = 0; struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; /* * Find the member to set active flag */ for (list = instance->member_list.next; list != &instance->member_list; list = list->next) { member = list_entry (list, struct totemudpu_member, list); if (totemip_compare (member_ip, &member->member) == 0) { log_printf(LOGSYS_LEVEL_DEBUG, "Marking UDPU member %s %s", totemip_print(&member->member), (active ? "active" : "inactive")); member->active = active; addr_found = 1; break; } } if (!addr_found) { log_printf(LOGSYS_LEVEL_DEBUG, "Can't find UDPU member %s (should be marked as %s)", totemip_print(member_ip), (active ? "active" : "inactive")); } return (0); } static void timer_function_merge_detect_timeout ( void *data) { struct totemudpu_instance *instance = (struct totemudpu_instance *)data; if (instance->merge_detect_messages_sent_before_timeout == 0) { instance->send_merge_detect_message = 1; } instance->merge_detect_messages_sent_before_timeout = 0; totemudpu_start_merge_detect_timeout(instance); } static void totemudpu_start_merge_detect_timeout( void *udpu_context) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; qb_loop_timer_add(instance->totemudpu_poll_handle, QB_LOOP_MED, instance->totem_config->merge_timeout * 2 * QB_TIME_NS_IN_MSEC, (void *)instance, timer_function_merge_detect_timeout, &instance->timer_merge_detect_timeout); } static void totemudpu_stop_merge_detect_timeout( void *udpu_context) { struct totemudpu_instance *instance = (struct totemudpu_instance *)udpu_context; qb_loop_timer_del(instance->totemudpu_poll_handle, instance->timer_merge_detect_timeout); } diff --git a/include/corosync/totem/totemip.h b/include/corosync/totem/totemip.h index 533735a7..0168e66c 100644 --- a/include/corosync/totem/totemip.h +++ b/include/corosync/totem/totemip.h @@ -1,121 +1,123 @@ /* * Copyright (c) 2005-2010 Red Hat, Inc. * * All rights reserved. * * Author: Patrick Caulfield (pcaulfie@redhat.com) * * This software licensed under BSD license, the text of which follows: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * - Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * - Neither the name of the MontaVista Software, Inc. nor the names of its * contributors may be used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF * THE POSSIBILITY OF SUCH DAMAGE. */ /* IPv4/6 abstraction */ #ifndef TOTEMIP_H_DEFINED #define TOTEMIP_H_DEFINED #include #include #include #include #include #ifdef __cplusplus extern "C" { #endif #ifdef SO_NOSIGPIPE #ifndef MSG_NOSIGNAL #define MSG_NOSIGNAL 0 #endif void totemip_nosigpipe(int s); #else #define totemip_nosigpipe(s) #endif #define TOTEMIP_ADDRLEN (sizeof(struct in6_addr)) /* These are the things that get passed around */ #define TOTEM_IP_ADDRESS struct totem_ip_address { unsigned int nodeid; unsigned short family; unsigned char addr[TOTEMIP_ADDRLEN]; } __attribute__((packed)); struct totem_ip_if_address { struct totem_ip_address ip_addr; struct totem_ip_address mask_addr; int interface_up; int interface_num; char *name; struct list_head list; }; extern int totemip_equal(const struct totem_ip_address *addr1, const struct totem_ip_address *addr2); extern int totemip_compare(const void *a, const void *b); extern int totemip_is_mcast(struct totem_ip_address *addr); extern void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2); extern void totemip_copy_endian_convert(struct totem_ip_address *addr1, const struct totem_ip_address *addr2); int totemip_localhost(int family, struct totem_ip_address *localhost); extern int totemip_localhost_check(const struct totem_ip_address *addr); extern const char *totemip_print(const struct totem_ip_address *addr); extern int totemip_sockaddr_to_totemip_convert(const struct sockaddr_storage *saddr, struct totem_ip_address *ip_addr); extern int totemip_totemip_to_sockaddr_convert(struct totem_ip_address *ip_addr, uint16_t port, struct sockaddr_storage *saddr, int *addrlen); extern int totemip_parse(struct totem_ip_address *totemip, const char *addr, int family); extern int totemip_iface_check(struct totem_ip_address *bindnet, struct totem_ip_address *boundto, int *interface_up, int *interface_num, int mask_high_bit); extern int totemip_getifaddrs(struct list_head *addrs); extern void totemip_freeifaddrs(struct list_head *addrs); /* These two simulate a zero in_addr by clearing the family field */ static inline void totemip_zero_set(struct totem_ip_address *addr) { addr->family = 0; } static inline int totemip_zero_check(const struct totem_ip_address *addr) { return (addr->family == 0); } +extern size_t totemip_udpip_header_size(int family); + #ifdef __cplusplus } #endif #endif