diff --git a/libknet/bindings/rust/tests/src/bin/knet-test.rs b/libknet/bindings/rust/tests/src/bin/knet-test.rs index f27b3cd8..32f688fa 100644 --- a/libknet/bindings/rust/tests/src/bin/knet-test.rs +++ b/libknet/bindings/rust/tests/src/bin/knet-test.rs @@ -1,921 +1,920 @@ // Testing the Knet Rust APIs // // Copyright (c) 2021 Red Hat, Inc. // // All rights reserved. // // Author: Christine Caulfield (ccaulfi@redhat.com) // use knet_bindings::knet_bindings as knet; use std::net::{SocketAddr, IpAddr, Ipv4Addr}; use std::thread::spawn; use std::sync::mpsc::Receiver; use std::sync::mpsc::channel; use std::io::{Result, ErrorKind, Error}; use std::{thread, time}; const CHANNEL: i8 = 1; // Dirty C function to set the plugin path for testing (only) extern { fn set_plugin_path(knet_h: knet::Handle); } // Callbacks fn sock_notify_fn(private_data: u64, datafd: i32, channel: i8, txrx: knet::TxRx, _res: Result<()>) { println!("sock notify called for host {}, datafd: {}, channel: {}, {}", private_data, datafd, channel, txrx); } fn link_notify_fn(private_data: u64, host_id: knet::HostId, link_id: u8, connected: bool, _remote: bool, _external: bool) { println!("link status notify called ({}) for host {}, linkid: {}, connected: {}", private_data, host_id.to_u16(), link_id, connected); } fn host_notify_fn(private_data: u64, host_id: knet::HostId, connected: bool, _remote: bool, _external: bool) { println!("host status notify called ({}) for host {}, connected: {}", private_data, host_id.to_u16(), connected); } fn pmtud_fn(private_data: u64, data_mtu: u32) { println!("PMTUD notify: host {}, MTU:{} ", private_data, data_mtu); } fn onwire_fn(private_data: u64, onwire_min_ver: u8, onwire_max_ver: u8, onwire_ver: u8) { println!("Onwire ver notify for {} : {}/{}/{}", private_data, onwire_min_ver, onwire_max_ver, onwire_ver); } fn filter_fn(private_data: u64, _outdata: &[u8], txrx: knet::TxRx, this_host_id: knet::HostId, src_host_id: knet::HostId, channel: &mut i8, dst_host_ids: &mut Vec) -> knet::FilterDecision { println!("Filter ({}) called {} to {} from {}, channel: {}", private_data, txrx, this_host_id, src_host_id, channel); + let dst: u16 = (private_data & 0xFFFF) as u16; + match txrx { knet::TxRx::Tx => { - knet::FilterDecision::Multicast + dst_host_ids.push(knet::HostId::new(3-dst)); + knet::FilterDecision::Unicast } knet::TxRx::Rx => { dst_host_ids.push(this_host_id); knet::FilterDecision::Unicast } } } fn logging_thread(recvr: Receiver) { for i in &recvr { eprintln!("KNET: {}", i.msg); } eprintln!("Logging thread finished"); } fn setup_node(our_hostid: &knet::HostId, other_hostid: &knet::HostId, name: &str) -> Result { let (log_sender, log_receiver) = channel::(); spawn(move || logging_thread(log_receiver)); let knet_handle = match knet::handle_new(our_hostid, Some(log_sender), knet::LogLevel::Debug, knet::HandleFlags::NONE) { Ok(h) => h, Err(e) => { println!("Error from handle_new: {}", e); return Err(e); } }; // Make sure we use the build-tree plugins if LD_LIBRRAY_PATH points to them unsafe { set_plugin_path(knet_handle); } if let Err(e) = knet::host_add(knet_handle, other_hostid) { println!("Error from host_add: {}", e); return Err(e); } if let Err(e) = knet::host_set_name(knet_handle, other_hostid, name) { println!("Error from host_set_name: {}", e); return Err(e); } Ok(knet_handle) } // Called by the ACL tests to get a free port for a dynamic link fn setup_dynamic_link(handle: knet::Handle, hostid: &knet::HostId, link: u8, lowest_port: u16) -> Result<()> { let mut src_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); for p in lowest_port..=65535 { src_addr.set_port(p); if let Err(e) = knet::link_set_config(handle, hostid, link, knet::TransportId::Udp, &src_addr, None, knet::LinkFlags::NONE) { if let Some(os_err) = e.raw_os_error() { if os_err != libc::EADDRINUSE { println!("Error from link_set_config(dyn): {}", e); return Err(e); } // In use - try the next port number } } else { println!("Dynamic link - Using port {}", p); return Ok(()) } } Err(Error::new(ErrorKind::Other, "No ports available")) } // This is the bit that configures two links on two handles that talk to each other // while also making sure they don't clash with anything else on the system fn setup_links(handle1: knet::Handle, hostid1: &knet::HostId, handle2: knet::Handle, hostid2: &knet::HostId) -> Result { let mut src_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); let mut dst_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); for p in 1025..=65534 { src_addr.set_port(p); dst_addr.set_port(p+1); if let Err(e) = knet::link_set_config(handle1, hostid2, 0, knet::TransportId::Udp, &src_addr, Some(&dst_addr), knet::LinkFlags::NONE) { if let Some(os_err) = e.raw_os_error() { if os_err != libc::EADDRINUSE { println!("Error from link_set_config(1): {}", e); return Err(e); } // In use - try the next port number } else { return Err(Error::new(ErrorKind::Other, "Error returned from link_set_config(1) was not an os_error")); } } else { // Now try the other handle if let Err(e) = knet::link_set_config(handle2, hostid1, 0, knet::TransportId::Udp, &dst_addr, Some(&src_addr), knet::LinkFlags::NONE) { if let Some(os_err) = e.raw_os_error() { if os_err != libc::EADDRINUSE { println!("Error from link_set_config(2): {}", e); return Err(e); } else { // In use - clear handle1 and try next pair of ports knet::link_clear_config(handle1, hostid2, 0)?; } } else { return Err(Error::new(ErrorKind::Other, "Error returned from link_set_config(1) was not an os_error")); } } println!("Bound to ports {} & {}",p, p+1); return Ok(p+2) } } Err(Error::new(ErrorKind::Other, "No ports available")) } // Finish configuring links fn configure_link(knet_handle: knet::Handle, our_hostid: &knet::HostId, other_hostid: &knet::HostId) -> Result<()> { if let Err(e) = knet::handle_enable_sock_notify(knet_handle, our_hostid.to_u16() as u64, Some(sock_notify_fn)) { println!("Error from handle_enable_sock_notify: {}", e); return Err(e); } if let Err(e) = knet::link_enable_status_change_notify(knet_handle, our_hostid.to_u16() as u64, Some(link_notify_fn)) { println!("Error from handle_enable_link_notify: {}", e); return Err(e); } if let Err(e) = knet::host_enable_status_change_notify(knet_handle, our_hostid.to_u16() as u64, Some(host_notify_fn)) { println!("Error from handle_enable_host_notify: {}", e); return Err(e); } if let Err(e) = knet::handle_enable_filter(knet_handle, our_hostid.to_u16() as u64, Some(filter_fn)) { println!("Error from handle_enable_filter: {}", e); return Err(e); } if let Err(e) = knet::handle_enable_pmtud_notify(knet_handle, our_hostid.to_u16() as u64, Some(pmtud_fn)) { println!("Error from handle_enable_pmtud_notify: {}", e); return Err(e); } if let Err(e) = knet::handle_enable_onwire_ver_notify(knet_handle, our_hostid.to_u16() as u64, Some(onwire_fn)) { println!("Error from handle_enable_onwire_ver_notify: {}", e); return Err(e); } match knet::handle_add_datafd(knet_handle, 0, CHANNEL) { Ok((fd,chan)) => { println!("Added datafd, fd={}, channel={}", fd, chan); }, Err(e) => { println!("Error from add_datafd: {}", e); return Err(e); } } if let Err(e) = knet::handle_crypto_rx_clear_traffic(knet_handle, knet::RxClearTraffic::Allow) { println!("Error from handle_crypto_rx_clear_traffic: {}", e); return Err(e); } if let Err(e) = knet::link_set_enable(knet_handle, other_hostid, 0, true) { println!("Error from set_link_enable(true): {}", e); return Err(e); } if let Err(e) = knet::link_set_ping_timers(knet_handle, other_hostid, 0, 500, 1000, 1024) { println!("Error from set_link_ping_timers: {}", e); return Err(e); } match knet::link_get_ping_timers(knet_handle, other_hostid, 0) { Ok((a,b,c)) => { if a != 500 || b != 1000 || c != 1024 { println!("get_link_ping_timers returned wrong values {}, {},{} (s/b 500,1000,1024)", a,b,c); return Err(Error::new(ErrorKind::Other, "Error in ping timers")); } }, Err(e) => { println!("Error from set_link_ping_timers: {}", e); return Err(e); } } if let Err(e) = knet::handle_setfwd(knet_handle, true) { println!("Error from setfwd(true): {}", e); return Err(e); } // Check status let data_fd = match knet::handle_get_datafd(knet_handle, CHANNEL) { Ok(f) => { println!("got datafd {} for channel", f); f } Err(e) => { println!("Error from handle_get_datafd: {}", e); return Err(e); } }; match knet::handle_get_channel(knet_handle, data_fd) { Ok(c) => if c != CHANNEL { println!("handle_get_channel returned wrong channel ID: {}, {}",c, CHANNEL); return Err(Error::new(ErrorKind::Other, "Error in handle_get_channel")); } Err(e) => { println!("Error from handle_get_channel: {}", e); return Err(e); } } match knet::link_get_enable(knet_handle, other_hostid, 0) { Ok(b) => if !b { println!("link not enabled (according to link_get_enable()"); }, Err(e) => { println!("Error from link_get_enable: {}", e); return Err(e); } } Ok(()) } fn recv_stuff(handle: knet::Handle, host: knet::HostId) -> Result<()> { let buf = [0u8; 1024]; loop { match knet::recv(handle, &buf, CHANNEL) { Ok(len) => { let recv_len = len as usize; if recv_len == 0 { break; // EOF?? } else { let s = String::from_utf8(buf[0..recv_len].to_vec()).unwrap(); println!("recvd on {}: {} {:?} {} ", host, recv_len, &buf[0..recv_len], s); if s == *"QUIT" { println!("got QUIT on {}, exitting", host); break; } } } Err(e) => { if e.kind() == ErrorKind::WouldBlock { thread::sleep(time::Duration::from_millis(100)); } else { println!("recv failed: {}", e); return Err(e); } } } } Ok(()) } fn close_handle(handle: knet::Handle, remnode: u16) -> Result<()> { let other_hostid = knet::HostId::new(remnode); if let Err(e) = knet::handle_setfwd(handle, false) { println!("Error from setfwd 1 (false): {}", e); return Err(e); } let data_fd = match knet::handle_get_datafd(handle, CHANNEL) { Ok(f) => { println!("got datafd {} for channel", f); f } Err(e) => { println!("Error from handle_get_datafd: {}", e); return Err(e); } }; if let Err(e) = knet::handle_remove_datafd(handle, data_fd) { println!("Error from handle_remove_datafd: {}", e); return Err(e); } if let Err(e) = knet::link_set_enable(handle, &other_hostid, 0, false) { println!("Error from set_link_enable(false): {}", e); return Err(e); } if let Err(e) = knet::link_clear_config(handle, &other_hostid, 0) { println!("clear config failed: {}", e); return Err(e); } if let Err(e) = knet::host_remove(handle, &other_hostid) { println!("host remove failed: {}", e); return Err(e); } if let Err(e) = knet::handle_free(handle) { println!("handle_free failed: {}", e); return Err(e); } Ok(()) } fn set_compress(handle: knet::Handle) -> Result<()> { let compress_config = knet::CompressConfig { compress_model: "zlib".to_string(), compress_threshold : 10, compress_level: 1, }; if let Err(e) = knet::handle_compress(handle, &compress_config) { println!("Error from handle_compress: {}", e); Err(e) } else { Ok(()) } } fn set_crypto(handle: knet::Handle) -> Result<()> { let private_key = [55u8; 2048]; // Add some crypto let crypto_config = knet::CryptoConfig { crypto_model: "openssl".to_string(), crypto_cipher_type: "aes256".to_string(), crypto_hash_type: "sha256".to_string(), private_key: &private_key, }; if let Err(e) = knet::handle_crypto_set_config(handle, &crypto_config, 1) { println!("Error from handle_crypto_set_config: {}", e); return Err(e); } if let Err(e) = knet::handle_crypto_use_config(handle, 1) { println!("Error from handle_crypto_use_config: {}", e); return Err(e); } if let Err(e) = knet::handle_crypto_rx_clear_traffic(handle, knet::RxClearTraffic::Disallow) { println!("Error from handle_crypto_rx_clear_traffic: {}", e); return Err(e); } Ok(()) } fn send_messages(handle: knet::Handle, send_quit: bool) -> Result<()> { let mut buf : [u8; 20] = [b'0'; 20]; for i in 0..10 { buf[i as usize + 1] = i + b'0'; match knet::send(handle, &buf, CHANNEL) { Ok(len) => { if len as usize != buf.len() { println!("sent {} bytes instead of {}", len, buf.len()); } }, Err(e) => { println!("send failed: {}", e); return Err(e); } } } - // Sleep to allow messages to calm down before we remove the filter - thread::sleep(time::Duration::from_millis(3000)); - if let Err(e) = knet::handle_enable_filter(handle, 0, None) { - println!("Error from handle_enable_filter (disable): {}", e); - return Err(e); - } - let s = String::from("SYNC TEST").into_bytes(); if let Err(e) = knet::send_sync(handle, &s, CHANNEL) { println!("send_sync failed: {}", e); return Err(e); } if send_quit { + // Sleep to allow messages to calm down before we tell the RX thread to quit + thread::sleep(time::Duration::from_millis(3000)); + let b = String::from("QUIT").into_bytes(); match knet::send(handle, &b, CHANNEL) { Ok(len) => { if len as usize != b.len() { println!("sent {} bytes instead of {}", len, b.len()); } }, Err(e) => { println!("send failed: {}", e); return Err(e); } } } Ok(()) } fn test_link_host_list(handle: knet::Handle) -> Result<()> { match knet::host_get_host_list(handle) { Ok(hosts) => { for i in &hosts { print!("host {}: links: ", i); match knet::link_get_link_list(handle, i) { Ok(ll) => { for j in ll { print!(" {}",j); } }, Err(e) => { println!("link_get_link_list failed: {}", e); return Err(e); } } println!(); } } Err(e) => { println!("link_get_host_list failed: {}", e); return Err(e); } } Ok(()) } // Try some metadata calls fn test_metadata_calls(handle: knet::Handle, host: &knet::HostId) -> Result<()> { if let Err(e) = knet::handle_set_threads_timer_res(handle, 190000) { println!("knet_handle_set_threads_timer_res failed: {:?}", e); return Err(e); } match knet::handle_get_threads_timer_res(handle) { Ok(v) => { if v != 190000 { println!("knet_handle_get_threads_timer_res returned wrong value {}", v); } }, Err(e) => { println!("knet_handle_set_threads_timer_res failed: {:?}", e); return Err(e); } } if let Err(e) = knet::handle_pmtud_set(handle, 1000) { println!("knet_handle_pmtud_set failed: {:?}", e); return Err(e); } match knet::handle_pmtud_get(handle) { Ok(v) => { if v != 1000 { println!("knet_handle_pmtud_get returned wrong value {} (ALLOWED)", v); // Don't fail on this, it might not have been set yet } }, Err(e) => { println!("knet_handle_pmtud_get failed: {:?}", e); return Err(e); } } if let Err(e) = knet::handle_pmtud_setfreq(handle, 1000) { println!("knet_handle_pmtud_setfreq failed: {:?}", e); return Err(e); } match knet::handle_pmtud_getfreq(handle) { Ok(v) => { if v != 1000 { println!("knet_handle_pmtud_getfreq returned wrong value {}", v); } }, Err(e) => { println!("knet_handle_pmtud_getfreq failed: {:?}", e); return Err(e); } } if let Err(e) = knet::handle_set_transport_reconnect_interval(handle, 100) { println!("knet_handle_set_transport_reconnect_interval failed: {:?}", e); return Err(e); } match knet::handle_get_transport_reconnect_interval(handle) { Ok(v) => { if v != 100 { println!("knet_handle_get_transport_reconnect_interval {}", v); } }, Err(e) => { println!("knet_handle_get_transport_reconnect_interval failed: {:?}", e); return Err(e); } } if let Err(e) = knet::link_set_pong_count(handle, host, 0, 4) { println!("knet_link_set_pong_count failed: {:?}", e); return Err(e); } match knet::link_get_pong_count(handle, host, 0) { Ok(v) => { if v != 4 { println!("knet_link_get_pong_count returned wrong value {}", v); } }, Err(e) => { println!("knet_link_get_pong_count failed: {:?}", e); return Err(e); } } if let Err(e) = knet::host_set_policy(handle, host, knet::LinkPolicy::Active) { println!("knet_host_set_policy failed: {:?}", e); return Err(e); } match knet::host_get_policy(handle, host) { Ok(v) => { if v != knet::LinkPolicy::Active { println!("knet_host_get_policy returned wrong value {}", v); } }, Err(e) => { println!("knet_host_get_policy failed: {:?}", e); return Err(e); } } if let Err(e) = knet::link_set_priority(handle, host, 0, 5) { println!("knet_link_set_priority failed: {:?}", e); return Err(e); } match knet::link_get_priority(handle, host, 0) { Ok(v) => { if v != 5 { println!("knet_link_get_priority returned wrong value {}", v); } }, Err(e) => { println!("knet_link_get_priority failed: {:?}", e); return Err(e); } } let name = match knet::host_get_name_by_host_id(handle, host) { Ok(n) => { println!("Returned host name is {}", n); n }, Err(e) => { println!("knet_host_get_name_by_host_id failed: {:?}", e); return Err(e); } }; match knet::host_get_id_by_host_name(handle, &name) { Ok(n) => { println!("Returned host id is {}", n); if n != *host { println!("Returned host id is not 2"); return Err(Error::new(ErrorKind::Other, "Error in get_id_by_host_name")); } }, Err(e) => { println!("knet_host_get_id_by_host_name failed: {:?}", e); return Err(e); } } match knet::link_get_config(handle, host, 0) { Ok((t, s, d, _f)) => { println!("Got link config: {}, {:?}, {:?}", t.to_string(),s,d); }, Err(e) => { println!("knet_link_get_config failed: {:?}", e); return Err(e); } } // Can't set this to anything different if let Err(e) = knet::handle_set_onwire_ver(handle, 1) { println!("knet_link_set_onwire_ver failed: {:?}", e); return Err(e); } match knet::handle_get_onwire_ver(handle, host) { Ok((min, max, ver)) => { println!("get_onwire_ver: Got onwire ver: {}/{}/{}", min, max, ver); }, Err(e) => { println!("knet_link_get_onwire_ver failed: {:?}", e); return Err(e); } } // Logging match knet::log_get_subsystem_name(3) { Ok(n) => println!("subsystem name for 3 is {}", n), Err(e) => { println!("knet_log_get_subsystem_name failed: {:?}", e); return Err(e); } } match knet::log_get_subsystem_id("TX") { Ok(n) => println!("subsystem ID for TX is {}", n), Err(e) => { println!("knet_log_get_subsystem_id failed: {:?}", e); return Err(e); } } match knet::log_get_loglevel_id("DEBUG") { Ok(n) => println!("loglevel ID for DEBUG is {}", n), Err(e) => { println!("knet_log_get_loglevel_id failed: {:?}", e); return Err(e); } } match knet::log_get_loglevel_name(1) { Ok(n) => println!("loglevel name for 1 is {}", n), Err(e) => { println!("knet_log_get_loglevel_name failed: {:?}", e); return Err(e); } } if let Err(e) = knet::log_set_loglevel(handle, knet::SubSystem::Handle , knet::LogLevel::Debug) { println!("knet_log_set_loglevel failed: {:?}", e); return Err(e); } match knet::log_get_loglevel(handle, knet::SubSystem::Handle) { Ok(n) => println!("loglevel for Handle is {}", n), Err(e) => { println!("knet_log_get_loglevel failed: {:?}", e); return Err(e); } } Ok(()) } fn test_acl(handle: knet::Handle, host: &knet::HostId, low_port: u16) -> Result<()> { if let Err(e) = knet::handle_enable_access_lists(handle, true) { println!("Error from handle_enable_access_lists: {:?}", e); return Err(e); } // Dynamic link for testing ACL APIs (it never gets used) if let Err(e) = setup_dynamic_link(handle, host, 1, low_port) { println!("Error from link_set_config (dynamic): {}", e); return Err(e); } // These ACLs are nonsense on stilts if let Err(e) = knet::link_add_acl(handle, host, 1, &SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8003_u16), &SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8003_u16), knet::AclCheckType::Address, knet::AclAcceptReject::Reject) { println!("Error from link_add_acl: {:?}", e); return Err(e); } if let Err(e) = knet::link_insert_acl(handle, host, 1, 0, &SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 8004_u16), &SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), 8004_u16), knet::AclCheckType::Address, knet::AclAcceptReject::Reject) { println!("Error from link_add_acl: {:?}", e); return Err(e); } if let Err(e) = knet::link_rm_acl(handle, host, 1, &SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8003_u16), &SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8003_u16), knet::AclCheckType::Address, knet::AclAcceptReject::Reject) { println!("Error from link_rm_acl: {:?}", e); return Err(e); } if let Err(e) = knet::link_clear_acl(handle, host, 1) { println!("Error from link_clear_acl: {:?}", e); return Err(e); } // Get rid of this link before it messes things up if let Err(e) = knet::link_clear_config(handle, host, 1) { println!("clear config (dynamic) failed: {}", e); return Err(e); } if let Err(e) = knet::handle_enable_access_lists(handle, false) { println!("Error from handle_enable_access_lists: {:?}", e); return Err(e); } Ok(()) } fn main() -> Result<()> { // Start with some non-handle information match knet::get_crypto_list() { Ok(l) => { print!("Crypto models:"); for i in &l { print!(" {}", i.name); } println!(); } Err(e) => { println!("link_get_crypto_list failed: {:?}", e); return Err(e); } } match knet::get_compress_list() { Ok(l) => { print!("Compress models:"); for i in &l { print!(" {}", i.name); } println!(); } Err(e) => { println!("link_get_compress_list failed: {:?}", e); return Err(e); } } match knet::get_transport_list() { Ok(l) => { print!("Transports:"); for i in &l { print!(" {}", i.name); } println!(); } Err(e) => { println!("link_get_transport_list failed: {:?}", e); return Err(e); } } let host1 = knet::HostId::new(1); let host2 = knet::HostId::new(2); // Now test traffic let handle1 = setup_node(&host1, &host2, "host2")?; let handle2 = setup_node(&host2, &host1, "host1")?; let low_port = setup_links(handle1, &host1, handle2, &host2)?; configure_link(handle1, &host1, &host2)?; configure_link(handle2, &host2, &host1)?; // Copy stuff for the threads let handle1_clone = handle1; let handle2_clone = handle2; let host1_clone = host1; let host2_clone = host2; // Wait for links to start thread::sleep(time::Duration::from_millis(10000)); test_link_host_list(handle1)?; test_link_host_list(handle2)?; // Start recv threads for each handle let thread_handles = vec![ spawn(move || recv_stuff(handle1_clone, host1_clone)), spawn(move || recv_stuff(handle2_clone, host2_clone)) ]; send_messages(handle1, false)?; send_messages(handle2, false)?; thread::sleep(time::Duration::from_millis(3000)); set_crypto(handle1)?; set_crypto(handle2)?; set_compress(handle1)?; set_compress(handle2)?; thread::sleep(time::Duration::from_millis(3000)); send_messages(handle1, true)?; send_messages(handle2, true)?; test_acl(handle1, &host2, low_port)?; // Wait for recv threads to finish for handle in thread_handles { if let Err(error) = handle.join() { println!("thread join error: {:?}", error); } } // Try some statses match knet::handle_get_stats(handle1) { Ok(s) => println!("handle stats: {}", s), Err(e) => { println!("handle_get_stats failed: {:?}", e); return Err(e); } } match knet::host_get_status(handle1, &host2) { Ok(s) => println!("host status: {}", s), Err(e) => { println!("host_get_status failed: {:?}", e); return Err(e); } } match knet::link_get_status(handle1, &host2, 0) { Ok(s) => println!("link status: {}", s), Err(e) => { println!("link_get_status failed: {:?}", e); return Err(e); } } if let Err(e) = knet::handle_clear_stats(handle1, knet::ClearStats::Handle) { println!("handle_clear_stats failed: {:?}", e); return Err(e); } test_metadata_calls(handle1, &knet::HostId::new(2))?; close_handle(handle1, 2)?; close_handle(handle2, 1)?; // Sleep to see if log thread dies thread::sleep(time::Duration::from_millis(3000)); Ok(()) } diff --git a/libknet/internals.h b/libknet/internals.h index 065d732a..96695778 100644 --- a/libknet/internals.h +++ b/libknet/internals.h @@ -1,463 +1,463 @@ /* * Copyright (C) 2010-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_INTERNALS_H__ #define __KNET_INTERNALS_H__ /* * NOTE: you shouldn't need to include this header normally */ #include #include #include #include "libknet.h" #include "onwire.h" #include "compat.h" #include "threads_common.h" #define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE #define KNET_DATABUFSIZE_CRYPT_PAD 1024 #define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD #define KNET_DATABUFSIZE_COMPRESS_PAD 1024 #define KNET_DATABUFSIZE_COMPRESS KNET_DATABUFSIZE + KNET_DATABUFSIZE_COMPRESS_PAD #define KNET_RING_RCVBUFF 8388608 #define PCKT_FRAG_MAX UINT8_MAX #define PCKT_RX_BUFS 512 #define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX + 1 /* * Size of threads stack. Value is choosen by experimenting, how much is needed * to sucesfully finish test suite, and at the time of writing patch it was * ~300KiB. To have some room for future enhancement it is increased * by factor of 3 and rounded. */ #define KNET_THREAD_STACK_SIZE (1024 * 1024) typedef void *knet_transport_link_t; /* per link transport handle */ typedef void *knet_transport_t; /* per knet_h transport handle */ struct knet_transport_ops; /* Forward because of circular dependancy */ struct knet_mmsghdr { struct msghdr msg_hdr; /* Message header */ unsigned int msg_len; /* Number of bytes transmitted */ }; struct knet_link { /* required */ struct sockaddr_storage src_addr; struct sockaddr_storage dst_addr; /* configurable */ unsigned int dynamic; /* see KNET_LINK_DYN_ define above */ uint8_t priority; /* higher priority == preferred for A/P */ unsigned long long ping_interval; /* interval */ unsigned long long pong_timeout; /* timeout */ unsigned long long pong_timeout_adj; /* timeout adjusted for latency */ uint8_t pong_timeout_backoff; /* see link.h for definition */ unsigned int latency_max_samples; /* precision */ uint8_t pong_count; /* how many ping/pong to send/receive before link is up */ uint64_t flags; + void *access_list_match_entry_head; /* pointer to access list match_entry list head */ /* status */ struct knet_link_status status; /* internals */ pthread_mutex_t link_stats_mutex; /* used to update link stats */ uint8_t link_id; uint8_t transport; /* #defined constant from API */ knet_transport_link_t transport_link; /* link_info_t from transport */ int outsock; unsigned int configured:1; /* set to 1 if src/dst have been configured transport initialized on this link*/ unsigned int transport_connected:1; /* set to 1 if lower level transport is connected */ uint8_t received_pong; struct timespec ping_last; /* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */ uint32_t proto_overhead; /* IP + UDP/SCTP overhead. NOT to be confused with stats.proto_overhead that includes also knet headers and crypto headers */ struct timespec pmtud_last; uint32_t last_ping_size; uint32_t last_good_mtu; uint32_t last_bad_mtu; uint32_t last_sent_mtu; uint32_t last_recv_mtu; uint32_t pmtud_crypto_timeout_multiplier;/* used by PMTUd to adjust timeouts on high loads */ uint8_t has_valid_mtu; }; #define KNET_DEFRAG_BUFFERS 32 #define KNET_CBUFFER_SIZE 4096 struct knet_host_defrag_buf { char buf[KNET_DATABUFSIZE]; uint8_t in_use; /* 0 buffer is free, 1 is in use */ seq_num_t pckt_seq; /* identify the pckt we are receiving */ uint8_t frag_recv; /* how many frags did we receive */ uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */ uint8_t last_first; /* special case if we receive the last fragment first */ ssize_t frag_size; /* normal frag size (not the last one) */ ssize_t last_frag_size; /* the last fragment might not be aligned with MTU size */ struct timespec last_update; /* keep time of the last pckt */ }; struct knet_host { /* required */ knet_node_id_t host_id; /* configurable */ uint8_t link_handler_policy; char name[KNET_MAX_HOST_LEN]; /* status */ struct knet_host_status status; /* * onwire info */ uint8_t onwire_ver; /* node current onwire version */ uint8_t onwire_max_ver; /* node supports up to this version */ /* internals */ char circular_buffer[KNET_CBUFFER_SIZE]; seq_num_t rx_seq_num; seq_num_t untimed_rx_seq_num; seq_num_t timed_rx_seq_num; uint8_t got_data; /* defrag/reassembly buffers */ struct knet_host_defrag_buf defrag_buf[KNET_DEFRAG_BUFFERS]; char circular_buffer_defrag[KNET_CBUFFER_SIZE]; /* link stuff */ struct knet_link link[KNET_MAX_LINK]; uint8_t active_link_entries; uint8_t active_links[KNET_MAX_LINK]; struct knet_host *next; }; struct knet_sock { int sockfd[2]; /* sockfd[0] will always be application facing * and sockfd[1] internal if sockpair has been created by knet */ int is_socket; /* check if it's a socket for recvmmsg usage */ int is_created; /* knet created this socket and has to clean up on exit/del */ int in_use; /* set to 1 if it's use, 0 if free */ int has_error; /* set to 1 if there were errors reading from the sock * and socket has been removed from epoll */ }; struct knet_fd_trackers { uint8_t transport; /* transport type (UDP/SCTP...) */ uint8_t data_type; /* internal use for transport to define what data are associated * with this fd */ socklen_t sockaddr_len; /* Size of sockaddr_in[6] structure for this socket */ void *data; /* pointer to the data */ - void *access_list_match_entry_head; /* pointer to access list match_entry list head */ }; #define KNET_MAX_FDS KNET_MAX_HOST * KNET_MAX_LINK * 4 #define KNET_MAX_COMPRESS_METHODS UINT8_MAX #define KNET_MAX_CRYPTO_INSTANCES 2 struct knet_handle_stats_extra { uint64_t tx_crypt_pmtu_packets; uint64_t tx_crypt_pmtu_reply_packets; uint64_t tx_crypt_ping_packets; uint64_t tx_crypt_pong_packets; }; struct knet_handle { knet_node_id_t host_id; unsigned int enabled:1; struct knet_sock sockfd[KNET_DATAFD_MAX + 1]; int logfd; uint8_t log_levels[KNET_MAX_SUBSYSTEMS]; int dstsockfd[2]; int send_to_links_epollfd; int recv_from_links_epollfd; int dst_link_handler_epollfd; uint8_t use_access_lists; /* set to 0 for disable, 1 for enable */ unsigned int pmtud_interval; unsigned int manual_mtu; unsigned int data_mtu; /* contains the max data size that we can send onwire * without frags */ struct knet_host *host_head; struct knet_host *host_index[KNET_MAX_HOST]; knet_transport_t transports[KNET_MAX_TRANSPORTS+1]; struct knet_fd_trackers knet_transport_fd_tracker[KNET_MAX_FDS]; /* track status for each fd handled by transports */ struct knet_handle_stats stats; struct knet_handle_stats_extra stats_extra; pthread_mutex_t handle_stats_mutex; /* used to protect handle stats */ uint32_t reconnect_int; knet_node_id_t host_ids[KNET_MAX_HOST]; size_t host_ids_entries; struct knet_header *recv_from_sock_buf; struct knet_header *send_to_links_buf[PCKT_FRAG_MAX]; struct knet_header *recv_from_links_buf[PCKT_RX_BUFS]; struct knet_header *pingbuf; struct knet_header *pmtudbuf; uint8_t threads_status[KNET_THREAD_MAX]; uint8_t threads_flush_queue[KNET_THREAD_MAX]; useconds_t threads_timer_res; pthread_mutex_t threads_status_mutex; pthread_t send_to_links_thread; pthread_t recv_from_links_thread; pthread_t heartbt_thread; pthread_t dst_link_handler_thread; pthread_t pmtud_link_handler_thread; pthread_rwlock_t global_rwlock; /* global config lock */ pthread_mutex_t pmtud_mutex; /* pmtud mutex to handle conditional send/recv + timeout */ pthread_cond_t pmtud_cond; /* conditional for above */ pthread_mutex_t tx_mutex; /* used to protect knet_send_sync and TX thread */ pthread_mutex_t hb_mutex; /* used to protect heartbeat thread and seq_num broadcasting */ pthread_mutex_t backoff_mutex; /* used to protect dst_link->pong_timeout_adj */ pthread_mutex_t kmtu_mutex; /* used to protect kernel_mtu */ pthread_mutex_t onwire_mutex; /* used to protect onwire version */ uint8_t onwire_ver; /* currently agreed onwire version across known nodes */ uint8_t onwire_min_ver; /* min and max are constant and don´t need any mutex protection. */ uint8_t onwire_max_ver; /* we define them as part of internal handle so that we can mingle with them for testing purposes */ uint8_t onwire_force_ver; /* manually configure onwire_ver */ uint8_t onwire_ver_remap; /* when this is on, all mapping will use version 1 for now */ uint32_t kernel_mtu; /* contains the MTU detected by the kernel on a given link */ int pmtud_waiting; int pmtud_running; int pmtud_forcerun; int pmtud_abort; struct crypto_instance *crypto_instance[KNET_MAX_CRYPTO_INSTANCES + 1]; /* store an extra pointer to allow 0|1|2 values without too much magic in the code */ uint8_t crypto_in_use_config; /* crypto config to use for TX */ uint8_t crypto_only; /* allow only crypto (1) or also clear (0) traffic */ size_t sec_block_size; size_t sec_hash_size; size_t sec_salt_size; unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX]; unsigned char *recv_from_links_buf_crypt; unsigned char *recv_from_links_buf_decrypt; unsigned char *pingbuf_crypt; unsigned char *pmtudbuf_crypt; int compress_model; int compress_level; size_t compress_threshold; void *compress_int_data[KNET_MAX_COMPRESS_METHODS]; /* for compress method private data */ unsigned char *recv_from_links_buf_decompress; unsigned char *send_to_links_buf_compress; seq_num_t tx_seq_num; pthread_mutex_t tx_seq_num_mutex; uint8_t has_loop_link; uint8_t loop_link; void *dst_host_filter_fn_private_data; int (*dst_host_filter_fn) ( void *private_data, const unsigned char *outdata, ssize_t outdata_len, uint8_t tx_rx, knet_node_id_t this_host_id, knet_node_id_t src_node_id, int8_t *channel, knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries); void *pmtud_notify_fn_private_data; void (*pmtud_notify_fn) ( void *private_data, unsigned int data_mtu); void *host_status_change_notify_fn_private_data; void (*host_status_change_notify_fn) ( void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external); void *link_status_change_notify_fn_private_data; void (*link_status_change_notify_fn) ( void *private_data, knet_node_id_t host_id, uint8_t link_id, uint8_t connected, uint8_t remote, uint8_t external); void *sock_notify_fn_private_data; void (*sock_notify_fn) ( void *private_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno); void *onwire_ver_notify_fn_private_data; void (*onwire_ver_notify_fn) ( void *private_data, uint8_t onwire_min_ver, uint8_t onwire_max_ver, uint8_t onwire_ver); int fini_in_progress; uint64_t flags; struct qb_list_head list; const char *plugin_path; }; struct handle_tracker { struct qb_list_head head; }; /* * lib_config stuff shared across everything */ extern pthread_rwlock_t shlib_rwlock; /* global shared lib load lock */ extern pthread_mutex_t handle_config_mutex; extern struct handle_tracker handle_list; extern uint8_t handle_list_init; int _is_valid_handle(knet_handle_t knet_h); int _init_shlib_tracker(knet_handle_t knet_h); void _fini_shlib_tracker(void); /* * NOTE: every single operation must be implementend * for every protocol. */ /* * for now knet supports only IP protocols (udp/sctp) * in future there might be others like ARP * or TIPC. * keep this around as transport information * to use for access lists and other operations */ #define TRANSPORT_PROTO_LOOPBACK 0 #define TRANSPORT_PROTO_IP_PROTO 1 /* * some transports like SCTP can filter incoming * connections before knet has to process * any packets. * GENERIC_ACL -> packet has to be read and filterted * PROTO_ACL -> transport provides filtering at lower levels * and packet does not need to be processed */ typedef enum { USE_NO_ACL, USE_GENERIC_ACL, USE_PROTO_ACL } transport_acl; /* * make it easier to map values in transports.c */ #define TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED 0 #define TRANSPORT_PROTO_IS_CONNECTION_ORIENTED 1 typedef struct knet_transport_ops { /* * transport generic information */ const char *transport_name; const uint8_t transport_id; const uint8_t built_in; uint8_t transport_protocol; transport_acl transport_acl_type; /* * connection oriented protocols like SCTP * don´t need dst_addr in sendto calls and * on some OSes are considered EINVAL. */ uint8_t transport_is_connection_oriented; uint32_t transport_mtu_overhead; /* * transport init must allocate the new transport * and perform all internal initializations * (threads, lists, etc). */ int (*transport_init)(knet_handle_t knet_h); /* * transport free must releases _all_ resources * allocated by tranport_init */ int (*transport_free)(knet_handle_t knet_h); /* * link operations should take care of all the * sockets and epoll management for a given link/transport set * transport_link_disable should return err = -1 and errno = EBUSY * if listener is still in use, and any other errno in case * the link cannot be disabled. * * set_config/clear_config are invoked in global write lock context */ int (*transport_link_set_config)(knet_handle_t knet_h, struct knet_link *link); int (*transport_link_clear_config)(knet_handle_t knet_h, struct knet_link *link); /* * transport callback for incoming dynamic connections * this is called in global read lock context */ int (*transport_link_dyn_connect)(knet_handle_t knet_h, int sockfd, struct knet_link *link); /* * return the fd to use for access lists */ int (*transport_link_get_acl_fd)(knet_handle_t knet_h, struct knet_link *link); /* * per transport error handling of recvmmsg * (see _handle_recv_from_links comments for details) */ /* * transport_rx_sock_error is invoked when recvmmsg returns <= 0 * * transport_rx_sock_error is invoked with both global_rdlock */ int (*transport_rx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno); /* * transport_tx_sock_error is invoked with global_rwlock and * it's invoked when sendto or sendmmsg returns =< 0 * * it should return: * -1 on internal error * 0 ignore error and continue * 1 retry * any sleep or wait action should happen inside the transport code */ int (*transport_tx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno); /* * this function is called on _every_ received packet * to verify if the packet is data or internal protocol error handling * * it should return: * -1 on error * 0 packet is not data and we should continue the packet process loop * 1 packet is not data and we should STOP the packet process loop * 2 packet is data and should be parsed as such * * transport_rx_is_data is invoked with both global_rwlock * and fd_tracker read lock (from RX thread) */ int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg); /* * this function is called by links.c when a link down event is recorded * to notify the transport that packets are not going through, and give * transport the opportunity to take actions. */ int (*transport_link_is_down)(knet_handle_t knet_h, struct knet_link *link); } knet_transport_ops_t; struct pretty_names { const char *name; uint8_t val; }; #endif diff --git a/libknet/links.c b/libknet/links.c index fe8768ba..fde04d98 100644 --- a/libknet/links.c +++ b/libknet/links.c @@ -1,1568 +1,1568 @@ /* * Copyright (C) 2012-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include "netutils.h" #include "internals.h" #include "logging.h" #include "links.h" #include "transports.h" #include "host.h" #include "threads_common.h" #include "links_acl.h" int _link_updown(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, unsigned int enabled, unsigned int connected, unsigned int lock_stats) { struct knet_host *host = knet_h->host_index[host_id]; struct knet_link *link = &host->link[link_id]; int notify_status = link->status.connected; int savederrno = 0; if ((link->status.enabled == enabled) && (link->status.connected == connected)) return 0; if ((link->status.enabled) && (knet_h->link_status_change_notify_fn)) { if (link->status.connected != connected) { notify_status = connected; /* connection state */ } if (!enabled) { notify_status = 0; /* disable == disconnected */ } knet_h->link_status_change_notify_fn( knet_h->link_status_change_notify_fn_private_data, host_id, link_id, notify_status, host->status.remote, host->status.external); } link->status.enabled = enabled; link->status.connected = connected; _host_dstcache_update_async(knet_h, host); if ((link->status.dynconnected) && (!link->status.connected)) { link->status.dynconnected = 0; } if (!connected) { transport_link_is_down(knet_h, link); } if (lock_stats) { savederrno = pthread_mutex_lock(&link->link_stats_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get stats mutex lock for host %u link %u: %s", host_id, link_id, strerror(savederrno)); errno = savederrno; return -1; } } if (connected) { time(&link->status.stats.last_up_times[link->status.stats.last_up_time_index]); link->status.stats.up_count++; if (++link->status.stats.last_up_time_index >= MAX_LINK_EVENTS) { link->status.stats.last_up_time_index = 0; } } else { time(&link->status.stats.last_down_times[link->status.stats.last_down_time_index]); link->status.stats.down_count++; if (++link->status.stats.last_down_time_index >= MAX_LINK_EVENTS) { link->status.stats.last_down_time_index = 0; } } if (lock_stats) { pthread_mutex_unlock(&link->link_stats_mutex); } return 0; } void _link_clear_stats(knet_handle_t knet_h) { struct knet_host *host; struct knet_link *link; uint32_t host_id; uint8_t link_id; for (host_id = 0; host_id < KNET_MAX_HOST; host_id++) { host = knet_h->host_index[host_id]; if (!host) { continue; } for (link_id = 0; link_id < KNET_MAX_LINK; link_id++) { link = &host->link[link_id]; memset(&link->status.stats, 0, sizeof(struct knet_link_stats)); } } } int knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t transport, struct sockaddr_storage *src_addr, struct sockaddr_storage *dst_addr, uint64_t flags) { int savederrno = 0, err = 0, i, wipelink = 0, link_idx; struct knet_host *host, *tmp_host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!src_addr) { errno = EINVAL; return -1; } if (dst_addr && (src_addr->ss_family != dst_addr->ss_family)) { log_err(knet_h, KNET_SUB_LINK, "Source address family does not match destination address family"); errno = EINVAL; return -1; } if (transport >= KNET_MAX_TRANSPORTS) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id != host_id) { log_err(knet_h, KNET_SUB_LINK, "Cannot create loopback link to remote node"); err = -1; savederrno = EINVAL; goto exit_unlock; } if (knet_h->host_id == host_id && knet_h->has_loop_link) { log_err(knet_h, KNET_SUB_LINK, "Cannot create more than 1 link when loopback is active"); err = -1; savederrno = EINVAL; goto exit_unlock; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } if (transport == KNET_TRANSPORT_LOOPBACK && knet_h->host_id == host_id) { for (i=0; ilink[i].configured) { log_err(knet_h, KNET_SUB_LINK, "Cannot add loopback link when other links are already configured."); err = -1; savederrno = EINVAL; goto exit_unlock; } } } link = &host->link[link_id]; if (link->configured != 0) { err =-1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->status.enabled != 0) { err =-1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } /* * errors happening after this point should trigger * a memset of the link */ wipelink = 1; copy_sockaddr(&link->src_addr, src_addr); err = knet_addrtostr(src_addr, sizeof(struct sockaddr_storage), link->status.src_ipaddr, KNET_MAX_HOST_LEN, link->status.src_port, KNET_MAX_PORT_LEN); if (err) { if (err == EAI_SYSTEM) { savederrno = errno; log_warn(knet_h, KNET_SUB_LINK, "Unable to resolve host: %u link: %u source addr/port: %s", host_id, link_id, strerror(savederrno)); } else { savederrno = EINVAL; log_warn(knet_h, KNET_SUB_LINK, "Unable to resolve host: %u link: %u source addr/port: %s", host_id, link_id, gai_strerror(err)); } err = -1; goto exit_unlock; } if (!dst_addr) { link->dynamic = KNET_LINK_DYNIP; } else { link->dynamic = KNET_LINK_STATIC; copy_sockaddr(&link->dst_addr, dst_addr); err = knet_addrtostr(dst_addr, sizeof(struct sockaddr_storage), link->status.dst_ipaddr, KNET_MAX_HOST_LEN, link->status.dst_port, KNET_MAX_PORT_LEN); if (err) { if (err == EAI_SYSTEM) { savederrno = errno; log_warn(knet_h, KNET_SUB_LINK, "Unable to resolve host: %u link: %u destination addr/port: %s", host_id, link_id, strerror(savederrno)); } else { savederrno = EINVAL; log_warn(knet_h, KNET_SUB_LINK, "Unable to resolve host: %u link: %u destination addr/port: %s", host_id, link_id, gai_strerror(err)); } err = -1; goto exit_unlock; } } link->pmtud_crypto_timeout_multiplier = KNET_LINK_PMTUD_CRYPTO_TIMEOUT_MULTIPLIER_MIN; link->pong_count = KNET_LINK_DEFAULT_PONG_COUNT; link->has_valid_mtu = 0; link->ping_interval = KNET_LINK_DEFAULT_PING_INTERVAL * 1000; /* microseconds */ link->pong_timeout = KNET_LINK_DEFAULT_PING_TIMEOUT * 1000; /* microseconds */ link->pong_timeout_backoff = KNET_LINK_PONG_TIMEOUT_BACKOFF; link->pong_timeout_adj = link->pong_timeout * link->pong_timeout_backoff; /* microseconds */ link->latency_max_samples = KNET_LINK_DEFAULT_PING_PRECISION; link->status.stats.latency_samples = 0; link->flags = flags; /* * check for DYNIP vs STATIC collisions. * example: link0 is static, user attempts to configure link1 as dynamic with the same source * address/port. * This configuration is invalid and would cause ACL collisions. */ for (tmp_host = knet_h->host_head; tmp_host != NULL; tmp_host = tmp_host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (&tmp_host->link[link_idx] == link) continue; if ((!memcmp(&tmp_host->link[link_idx].src_addr, &link->src_addr, sizeof(struct sockaddr_storage))) && (tmp_host->link[link_idx].dynamic != link->dynamic)) { savederrno = EINVAL; err = -1; log_err(knet_h, KNET_SUB_LINK, "Failed to configure host %u link %u dyn %u. Conflicts with host %u link %u dyn %u: %s", host_id, link_id, link->dynamic, tmp_host->host_id, link_idx, tmp_host->link[link_idx].dynamic, strerror(savederrno)); goto exit_unlock; } } } savederrno = pthread_mutex_init(&link->link_stats_mutex, NULL); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to initialize link stats mutex: %s", strerror(savederrno)); err = -1; goto exit_unlock; } if (transport_link_set_config(knet_h, link, transport) < 0) { savederrno = errno; err = -1; goto exit_transport_err; } /* * we can only configure default access lists if we know both endpoints * and the protocol uses GENERIC_ACL, otherwise the protocol has * to setup their own access lists above in transport_link_set_config. */ if ((transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL) && (link->dynamic == KNET_LINK_STATIC)) { log_debug(knet_h, KNET_SUB_LINK, "Configuring default access lists for host: %u link: %u socket: %d", host_id, link_id, link->outsock); - if ((check_add(knet_h, link->outsock, transport, -1, + if ((check_add(knet_h, link, -1, &link->dst_addr, &link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) { log_warn(knet_h, KNET_SUB_LINK, "Failed to configure default access lists for host: %u link: %u", host_id, link_id); savederrno = errno; err = -1; goto exit_acl_error; } } /* * no errors should happen after link is configured */ link->configured = 1; log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is configured", host_id, link_id); if (transport == KNET_TRANSPORT_LOOPBACK) { knet_h->has_loop_link = 1; knet_h->loop_link = link_id; host->status.reachable = 1; link->status.mtu = KNET_PMTUD_SIZE_V6; } else { /* * calculate the minimum MTU that is safe to use, * based on RFCs and that each network device should * be able to support without any troubles */ if (link->dynamic == KNET_LINK_STATIC) { /* * with static link we can be more precise than using * the generic calc_min_mtu() */ switch (link->dst_addr.ss_family) { case AF_INET6: link->status.mtu = calc_max_data_outlen(knet_h, KNET_PMTUD_MIN_MTU_V6 - (KNET_PMTUD_OVERHEAD_V6 + link->proto_overhead)); break; case AF_INET: link->status.mtu = calc_max_data_outlen(knet_h, KNET_PMTUD_MIN_MTU_V4 - (KNET_PMTUD_OVERHEAD_V4 + link->proto_overhead)); break; } } else { /* * for dynamic links we start with the minimum MTU * possible and PMTUd will kick in immediately * after connection status is 1 */ link->status.mtu = calc_min_mtu(knet_h); } link->has_valid_mtu = 1; } exit_acl_error: /* * if creating access lists has error, we only need to clean * the transport and the stuff below. */ if (err < 0) { if ((transport_link_clear_config(knet_h, link) < 0) && (errno != EBUSY)) { log_warn(knet_h, KNET_SUB_LINK, "Failed to deconfigure transport for host %u link %u: %s", host_id, link_id, strerror(errno)); } } exit_transport_err: /* * if transport has errors, transport will clean after itself * and we only need to clean the mutex */ if (err < 0) { pthread_mutex_destroy(&link->link_stats_mutex); } exit_unlock: /* * re-init the link on error */ if ((err < 0) && (wipelink)) { memset(link, 0, sizeof(struct knet_link)); link->link_id = link_id; } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t *transport, struct sockaddr_storage *src_addr, struct sockaddr_storage *dst_addr, uint8_t *dynamic, uint64_t *flags) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!src_addr) { errno = EINVAL; return -1; } if (!dynamic) { errno = EINVAL; return -1; } if (!transport) { errno = EINVAL; return -1; } if (!flags) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if ((link->dynamic == KNET_LINK_STATIC) && (!dst_addr)) { savederrno = EINVAL; err = -1; goto exit_unlock; } memmove(src_addr, &link->src_addr, sockaddr_len(&link->src_addr)); *transport = link->transport; *flags = link->flags; if (link->dynamic == KNET_LINK_STATIC) { *dynamic = 0; memmove(dst_addr, &link->dst_addr, sockaddr_len(&link->dst_addr)); } else { *dynamic = 1; } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_clear_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; int sock; uint8_t transport; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (link->configured != 1) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->status.enabled != 0) { err = -1; savederrno = EBUSY; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u is currently in use: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } /* * remove well known access lists here. * After the transport has done clearing the config, * then we can remove any leftover access lists if the link * is no longer in use. */ if ((transport_get_acl_type(knet_h, link->transport) == USE_GENERIC_ACL) && (link->dynamic == KNET_LINK_STATIC)) { - if ((check_rm(knet_h, link->outsock, link->transport, + if ((check_rm(knet_h, link, &link->dst_addr, &link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != ENOENT)) { err = -1; savederrno = errno; log_err(knet_h, KNET_SUB_LINK, "Host %u link %u: unable to remove default access list", host_id, link_id); goto exit_unlock; } } /* * cache it for later as we don't know if the transport * will clear link info during clear_config. */ sock = link->outsock; transport = link->transport; if ((transport_link_clear_config(knet_h, link) < 0) && (errno != EBUSY)) { savederrno = errno; err = -1; goto exit_unlock; } /* * remove any other access lists when the socket is no * longer in use by the transport. */ if ((transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL) && (knet_h->knet_transport_fd_tracker[sock].transport == KNET_MAX_TRANSPORTS)) { - check_rmall(knet_h, sock, transport); + check_rmall(knet_h, link); } pthread_mutex_destroy(&link->link_stats_mutex); memset(link, 0, sizeof(struct knet_link)); link->link_id = link_id; if (knet_h->has_loop_link && host_id == knet_h->host_id && link_id == knet_h->loop_link) { knet_h->has_loop_link = 0; if (host->active_link_entries == 0) { host->status.reachable = 0; } } log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u config has been wiped", host_id, link_id); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_set_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, unsigned int enabled) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (enabled > 1) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->status.enabled == enabled) { err = 0; goto exit_unlock; } err = _link_updown(knet_h, host_id, link_id, enabled, link->status.connected, 0); savederrno = errno; if (enabled) { goto exit_unlock; } log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u is disabled", host_id, link_id); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_enable(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, unsigned int *enabled) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!enabled) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } *enabled = link->status.enabled; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_set_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t pong_count) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (pong_count < 1) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } link->pong_count = pong_count; log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u pong count update: %u", host_id, link_id, link->pong_count); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_pong_count(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t *pong_count) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!pong_count) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } *pong_count = link->pong_count; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_set_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, time_t interval, time_t timeout, unsigned int precision) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!interval) { errno = EINVAL; return -1; } if (!timeout) { errno = ENOSYS; return -1; } if (!precision) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } link->ping_interval = interval * 1000; /* microseconds */ link->pong_timeout = timeout * 1000; /* microseconds */ link->latency_max_samples = precision; log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u timeout update - interval: %llu timeout: %llu precision: %u", host_id, link_id, link->ping_interval, link->pong_timeout, precision); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_ping_timers(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, time_t *interval, time_t *timeout, unsigned int *precision) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!interval) { errno = EINVAL; return -1; } if (!timeout) { errno = EINVAL; return -1; } if (!precision) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } *interval = link->ping_interval / 1000; /* microseconds */ *timeout = link->pong_timeout / 1000; *precision = link->latency_max_samples; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_set_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t priority) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; uint8_t old_priority; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } old_priority = link->priority; if (link->priority == priority) { err = 0; goto exit_unlock; } link->priority = priority; if (_host_dstcache_update_sync(knet_h, host)) { savederrno = errno; log_debug(knet_h, KNET_SUB_LINK, "Unable to update link priority (host: %u link: %u priority: %u): %s", host_id, link_id, link->priority, strerror(savederrno)); link->priority = old_priority; err = -1; goto exit_unlock; } log_debug(knet_h, KNET_SUB_LINK, "host: %u link: %u priority set to: %u", host_id, link_id, link->priority); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_priority(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t *priority) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!priority) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } *priority = link->priority; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_link_list(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t *link_ids, size_t *link_ids_entries) { int savederrno = 0, err = 0, i, count = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (!link_ids) { errno = EINVAL; return -1; } if (!link_ids_entries) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } for (i = 0; i < KNET_MAX_LINK; i++) { link = &host->link[i]; if (!link->configured) { continue; } link_ids[count] = i; count++; } *link_ids_entries = count; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_get_status(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, struct knet_link_status *status, size_t struct_size) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } if (!status) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } savederrno = pthread_mutex_lock(&link->link_stats_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get stats mutex lock for host %u link %u: %s", host_id, link_id, strerror(savederrno)); err = -1; goto exit_unlock; } memmove(status, &link->status, struct_size); pthread_mutex_unlock(&link->link_stats_mutex); /* Calculate totals - no point in doing this on-the-fly */ status->stats.rx_total_packets = status->stats.rx_data_packets + status->stats.rx_ping_packets + status->stats.rx_pong_packets + status->stats.rx_pmtu_packets; status->stats.tx_total_packets = status->stats.tx_data_packets + status->stats.tx_ping_packets + status->stats.tx_pong_packets + status->stats.tx_pmtu_packets; status->stats.rx_total_bytes = status->stats.rx_data_bytes + status->stats.rx_ping_bytes + status->stats.rx_pong_bytes + status->stats.rx_pmtu_bytes; status->stats.tx_total_bytes = status->stats.tx_data_bytes + status->stats.tx_ping_bytes + status->stats.tx_pong_bytes + status->stats.tx_pmtu_bytes; status->stats.tx_total_errors = status->stats.tx_data_errors + status->stats.tx_ping_errors + status->stats.tx_pong_errors + status->stats.tx_pmtu_errors; status->stats.tx_total_retries = status->stats.tx_data_retries + status->stats.tx_ping_retries + status->stats.tx_pong_retries + status->stats.tx_pmtu_retries; /* Tell the caller our full size in case they have an old version */ status->size = sizeof(struct knet_link_status); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } int knet_link_enable_status_change_notify(knet_handle_t knet_h, void *link_status_change_notify_fn_private_data, void (*link_status_change_notify_fn) ( void *private_data, knet_node_id_t host_id, uint8_t link_id, uint8_t connected, uint8_t remote, uint8_t external)) { int savederrno = 0; if (!_is_valid_handle(knet_h)) { return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } knet_h->link_status_change_notify_fn_private_data = link_status_change_notify_fn_private_data; knet_h->link_status_change_notify_fn = link_status_change_notify_fn; if (knet_h->link_status_change_notify_fn) { log_debug(knet_h, KNET_SUB_LINK, "link_status_change_notify_fn enabled"); } else { log_debug(knet_h, KNET_SUB_LINK, "link_status_change_notify_fn disabled"); } pthread_rwlock_unlock(&knet_h->global_rwlock); errno = 0; return 0; } int knet_link_insert_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, int index, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (!ss1) { errno = EINVAL; return -1; } if ((type != CHECK_TYPE_ADDRESS) && (type != CHECK_TYPE_MASK) && (type != CHECK_TYPE_RANGE)) { errno = EINVAL; return -1; } if ((acceptreject != CHECK_ACCEPT) && (acceptreject != CHECK_REJECT)) { errno = EINVAL; return -1; } if ((type != CHECK_TYPE_ADDRESS) && (!ss2)) { errno = EINVAL; return -1; } if ((type == CHECK_TYPE_RANGE) && (ss1->ss_family != ss2->ss_family)) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->dynamic != KNET_LINK_DYNIP) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is a point to point connection: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } - err = check_add(knet_h, transport_link_get_acl_fd(knet_h, link), link->transport, index, + err = check_add(knet_h, link, index, ss1, ss2, type, acceptreject); savederrno = errno; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_add_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject) { return knet_link_insert_acl(knet_h, host_id, link_id, -1, ss1, ss2, type, acceptreject); } int knet_link_rm_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (!ss1) { errno = EINVAL; return -1; } if ((type != CHECK_TYPE_ADDRESS) && (type != CHECK_TYPE_MASK) && (type != CHECK_TYPE_RANGE)) { errno = EINVAL; return -1; } if ((acceptreject != CHECK_ACCEPT) && (acceptreject != CHECK_REJECT)) { errno = EINVAL; return -1; } if ((type != CHECK_TYPE_ADDRESS) && (!ss2)) { errno = EINVAL; return -1; } if ((type == CHECK_TYPE_RANGE) && (ss1->ss_family != ss2->ss_family)) { errno = EINVAL; return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->dynamic != KNET_LINK_DYNIP) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is a point to point connection: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } - err = check_rm(knet_h, transport_link_get_acl_fd(knet_h, link), link->transport, + err = check_rm(knet_h, link, ss1, ss2, type, acceptreject); savederrno = errno; exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } int knet_link_clear_acl(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id) { int savederrno = 0, err = 0; struct knet_host *host; struct knet_link *link; if (!_is_valid_handle(knet_h)) { return -1; } if (link_id >= KNET_MAX_LINK) { errno = EINVAL; return -1; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s", strerror(savederrno)); errno = savederrno; return -1; } host = knet_h->host_index[host_id]; if (!host) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "Unable to find host %u: %s", host_id, strerror(savederrno)); goto exit_unlock; } link = &host->link[link_id]; if (!link->configured) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is not configured: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } if (link->dynamic != KNET_LINK_DYNIP) { err = -1; savederrno = EINVAL; log_err(knet_h, KNET_SUB_LINK, "host %u link %u is a point to point connection: %s", host_id, link_id, strerror(savederrno)); goto exit_unlock; } - check_rmall(knet_h, transport_link_get_acl_fd(knet_h, link), link->transport); + check_rmall(knet_h, link); exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = savederrno; return err; } diff --git a/libknet/links_acl.c b/libknet/links_acl.c index 1612f15e..66faf244 100644 --- a/libknet/links_acl.c +++ b/libknet/links_acl.c @@ -1,65 +1,67 @@ /* * Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved. * * Author: Christine Caulfield * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include "internals.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "links_acl.h" #include "links_acl_ip.h" #include "links_acl_loopback.h" static check_ops_t proto_check_modules_cmds[] = { { TRANSPORT_PROTO_LOOPBACK, loopbackcheck_validate, loopbackcheck_add, loopbackcheck_rm, loopbackcheck_rmall }, { TRANSPORT_PROTO_IP_PROTO, ipcheck_validate, ipcheck_addip, ipcheck_rmip, ipcheck_rmall } }; /* * all those functions will return errno from the * protocol specific functions */ -int check_add(knet_handle_t knet_h, int sock, uint8_t transport, int index, +int check_add(knet_handle_t knet_h, struct knet_link *kn_link, + int index, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject) { - return proto_check_modules_cmds[transport_get_proto(knet_h, transport)].protocheck_add( - &knet_h->knet_transport_fd_tracker[sock].access_list_match_entry_head, index, + return proto_check_modules_cmds[transport_get_proto(knet_h, kn_link->transport)].protocheck_add( + &kn_link->access_list_match_entry_head, index, ss1, ss2, type, acceptreject); } -int check_rm(knet_handle_t knet_h, int sock, uint8_t transport, +int check_rm(knet_handle_t knet_h, struct knet_link *kn_link, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject) { - return proto_check_modules_cmds[transport_get_proto(knet_h, transport)].protocheck_rm( - &knet_h->knet_transport_fd_tracker[sock].access_list_match_entry_head, + return proto_check_modules_cmds[transport_get_proto(knet_h, kn_link->transport)].protocheck_rm( + &kn_link->access_list_match_entry_head, ss1, ss2, type, acceptreject); } -void check_rmall(knet_handle_t knet_h, int sock, uint8_t transport) +void check_rmall(knet_handle_t knet_h, struct knet_link *kn_link) { - proto_check_modules_cmds[transport_get_proto(knet_h, transport)].protocheck_rmall( - &knet_h->knet_transport_fd_tracker[sock].access_list_match_entry_head); + proto_check_modules_cmds[transport_get_proto(knet_h, kn_link->transport)].protocheck_rmall( + &kn_link->access_list_match_entry_head); } /* * return 0 to reject and 1 to accept a packet */ -int check_validate(knet_handle_t knet_h, int sock, uint8_t transport, struct sockaddr_storage *checkip) +int check_validate(knet_handle_t knet_h, struct knet_link *kn_link, + struct sockaddr_storage *checkip) { - return proto_check_modules_cmds[transport_get_proto(knet_h, transport)].protocheck_validate( - &knet_h->knet_transport_fd_tracker[sock].access_list_match_entry_head, checkip); + return proto_check_modules_cmds[transport_get_proto(knet_h, kn_link->transport)].protocheck_validate( + &kn_link->access_list_match_entry_head, checkip); } diff --git a/libknet/links_acl.h b/libknet/links_acl.h index 9901c956..3b9fd35e 100644 --- a/libknet/links_acl.h +++ b/libknet/links_acl.h @@ -1,39 +1,44 @@ /* * Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved. * * Author: Christine Caulfield * * This software licensed under LGPL-2.0+ */ #ifndef __KNET_LINKS_ACL_H__ #define __KNET_LINKS_ACL_H__ #include "internals.h" typedef struct { uint8_t transport_proto; int (*protocheck_validate) (void *fd_tracker_match_entry_head, struct sockaddr_storage *checkip); int (*protocheck_add) (void *fd_tracker_match_entry_head, int index, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject); int (*protocheck_rm) (void *fd_tracker_match_entry_head, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject); void (*protocheck_rmall) (void *fd_tracker_match_entry_head); } check_ops_t; -int check_add(knet_handle_t knet_h, int sock, uint8_t transport, int index, +int check_add(knet_handle_t knet_h, struct knet_link *kn_link, + int index, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject); -int check_rm(knet_handle_t knet_h, int sock, uint8_t transport, + +int check_rm(knet_handle_t knet_h, struct knet_link *kn_link, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject); -void check_rmall(knet_handle_t knet_h, int sock, uint8_t transport); -int check_validate(knet_handle_t knet_h, int sock, uint8_t transport, struct sockaddr_storage *checkip); + +void check_rmall(knet_handle_t knet_h, struct knet_link *kn_link); + +int check_validate(knet_handle_t knet_h, struct knet_link *kn_link, + struct sockaddr_storage *checkip); #endif diff --git a/libknet/links_acl_ip.c b/libknet/links_acl_ip.c index ea1755f2..8d562c58 100644 --- a/libknet/links_acl_ip.c +++ b/libknet/links_acl_ip.c @@ -1,302 +1,305 @@ /* * Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved. * * Author: Christine Caulfield * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "internals.h" #include "netutils.h" #include "logging.h" #include "transports.h" #include "links_acl.h" #include "links_acl_ip.h" struct ip_acl_match_entry { check_type_t type; check_acceptreject_t acceptreject; struct sockaddr_storage addr1; /* Actual IP address, mask top or low IP */ struct sockaddr_storage addr2; /* high IP address or address bitmask */ struct ip_acl_match_entry *next; }; /* * IPv4 See if the address we have matches the current match entry */ static int ip_matches_v4(struct sockaddr_storage *checkip, struct ip_acl_match_entry *match_entry) { struct sockaddr_in *ip_to_check; struct sockaddr_in *match1; struct sockaddr_in *match2; ip_to_check = (struct sockaddr_in *)checkip; match1 = (struct sockaddr_in *)&match_entry->addr1; match2 = (struct sockaddr_in *)&match_entry->addr2; switch(match_entry->type) { case CHECK_TYPE_ADDRESS: if (ip_to_check->sin_addr.s_addr == match1->sin_addr.s_addr) return 1; break; case CHECK_TYPE_MASK: if ((ip_to_check->sin_addr.s_addr & match2->sin_addr.s_addr) == match1->sin_addr.s_addr) return 1; break; case CHECK_TYPE_RANGE: if ((ntohl(ip_to_check->sin_addr.s_addr) >= ntohl(match1->sin_addr.s_addr)) && (ntohl(ip_to_check->sin_addr.s_addr) <= ntohl(match2->sin_addr.s_addr))) return 1; break; } return 0; } /* * Compare two IPv6 addresses */ static int ip6addr_cmp(struct in6_addr *a, struct in6_addr *b) { uint64_t a_high, a_low; uint64_t b_high, b_low; a_high = ((uint64_t)htonl(a->s6_addr32[0]) << 32) | (uint64_t)htonl(a->s6_addr32[1]); a_low = ((uint64_t)htonl(a->s6_addr32[2]) << 32) | (uint64_t)htonl(a->s6_addr32[3]); b_high = ((uint64_t)htonl(b->s6_addr32[0]) << 32) | (uint64_t)htonl(b->s6_addr32[1]); b_low = ((uint64_t)htonl(b->s6_addr32[2]) << 32) | (uint64_t)htonl(b->s6_addr32[3]); if (a_high > b_high) return 1; if (a_high < b_high) return -1; if (a_low > b_low) return 1; if (a_low < b_low) return -1; return 0; } /* * IPv6 See if the address we have matches the current match entry */ static int ip_matches_v6(struct sockaddr_storage *checkip, struct ip_acl_match_entry *match_entry) { struct sockaddr_in6 *ip_to_check; struct sockaddr_in6 *match1; struct sockaddr_in6 *match2; int i; ip_to_check = (struct sockaddr_in6 *)checkip; match1 = (struct sockaddr_in6 *)&match_entry->addr1; match2 = (struct sockaddr_in6 *)&match_entry->addr2; switch(match_entry->type) { case CHECK_TYPE_ADDRESS: if (!memcmp(ip_to_check->sin6_addr.s6_addr32, match1->sin6_addr.s6_addr32, sizeof(struct in6_addr))) return 1; break; case CHECK_TYPE_MASK: /* * Note that this little loop will quit early if there is a non-match so the * comparison might look backwards compared to the IPv4 one */ for (i=sizeof(struct in6_addr)/4-1; i>=0; i--) { if ((ip_to_check->sin6_addr.s6_addr32[i] & match2->sin6_addr.s6_addr32[i]) != match1->sin6_addr.s6_addr32[i]) return 0; } return 1; case CHECK_TYPE_RANGE: if ((ip6addr_cmp(&ip_to_check->sin6_addr, &match1->sin6_addr) >= 0) && (ip6addr_cmp(&ip_to_check->sin6_addr, &match2->sin6_addr) <= 0)) return 1; break; } return 0; } int ipcheck_validate(void *fd_tracker_match_entry_head, struct sockaddr_storage *checkip) { struct ip_acl_match_entry **match_entry_head = (struct ip_acl_match_entry **)fd_tracker_match_entry_head; struct ip_acl_match_entry *match_entry = *match_entry_head; int (*match_fn)(struct sockaddr_storage *checkip, struct ip_acl_match_entry *match_entry); if (checkip->ss_family == AF_INET) { match_fn = ip_matches_v4; } else { match_fn = ip_matches_v6; } while (match_entry) { if (match_fn(checkip, match_entry)) { if (match_entry->acceptreject == CHECK_ACCEPT) return 1; else return 0; } match_entry = match_entry->next; } return 0; /* Default reject */ } /* * Routines to manuipulate access lists */ void ipcheck_rmall(void *fd_tracker_match_entry_head) { struct ip_acl_match_entry **match_entry_head = (struct ip_acl_match_entry **)fd_tracker_match_entry_head; struct ip_acl_match_entry *next_match_entry; struct ip_acl_match_entry *match_entry = *match_entry_head; while (match_entry) { next_match_entry = match_entry->next; free(match_entry); match_entry = next_match_entry; } *match_entry_head = NULL; } static struct ip_acl_match_entry *ipcheck_findmatch(struct ip_acl_match_entry **match_entry_head, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject) { struct ip_acl_match_entry *match_entry = *match_entry_head; while (match_entry) { - if ((!memcmp(&match_entry->addr1, ss1, sizeof(struct sockaddr_storage))) && - (!memcmp(&match_entry->addr2, ss2, sizeof(struct sockaddr_storage))) && + if ((!memcmp(&match_entry->addr1, ss1, sockaddr_len(ss1))) && + ((match_entry->type == CHECK_TYPE_ADDRESS) || + ((match_entry->type != CHECK_TYPE_ADDRESS) && ss2 && !memcmp(&match_entry->addr2, ss2, sockaddr_len(ss1)))) && (match_entry->type == type) && (match_entry->acceptreject == acceptreject)) { return match_entry; } match_entry = match_entry->next; } return NULL; } int ipcheck_rmip(void *fd_tracker_match_entry_head, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject) { struct ip_acl_match_entry **match_entry_head = (struct ip_acl_match_entry **)fd_tracker_match_entry_head; struct ip_acl_match_entry *next_match_entry = NULL; struct ip_acl_match_entry *rm_match_entry; struct ip_acl_match_entry *match_entry = *match_entry_head; rm_match_entry = ipcheck_findmatch(match_entry_head, ss1, ss2, type, acceptreject); if (!rm_match_entry) { errno = ENOENT; return -1; } while (match_entry) { next_match_entry = match_entry->next; /* * we are removing the list head, be careful */ if (rm_match_entry == match_entry) { *match_entry_head = next_match_entry; free(match_entry); break; } /* * the next one is the one we need to remove */ if (rm_match_entry == next_match_entry) { match_entry->next = next_match_entry->next; free(next_match_entry); break; } match_entry = next_match_entry; } return 0; } int ipcheck_addip(void *fd_tracker_match_entry_head, int index, struct sockaddr_storage *ss1, struct sockaddr_storage *ss2, check_type_t type, check_acceptreject_t acceptreject) { struct ip_acl_match_entry **match_entry_head = (struct ip_acl_match_entry **)fd_tracker_match_entry_head; struct ip_acl_match_entry *new_match_entry; struct ip_acl_match_entry *match_entry = *match_entry_head; int i = 0; if (ipcheck_findmatch(match_entry_head, ss1, ss2, type, acceptreject) != NULL) { errno = EEXIST; return -1; } new_match_entry = malloc(sizeof(struct ip_acl_match_entry)); if (!new_match_entry) { return -1; } copy_sockaddr(&new_match_entry->addr1, ss1); - copy_sockaddr(&new_match_entry->addr2, ss2); + if (ss2) { + copy_sockaddr(&new_match_entry->addr2, ss2); + } new_match_entry->type = type; new_match_entry->acceptreject = acceptreject; new_match_entry->next = NULL; if (match_entry) { /* * special case for index 0, since we need to update * the head of the list */ if (index == 0) { *match_entry_head = new_match_entry; new_match_entry->next = match_entry; } else { /* * find the end of the list or stop at "index" */ while (match_entry->next) { match_entry = match_entry->next; if (i == index) { break; } i++; } /* * insert if there are more entries in the list */ if (match_entry->next) { new_match_entry->next = match_entry->next; } /* * add if we are at the end */ match_entry->next = new_match_entry; } } else { /* * first entry in the list */ *match_entry_head = new_match_entry; } return 0; } diff --git a/libknet/tests/Makefile.am b/libknet/tests/Makefile.am index d873157f..f2481ab3 100644 --- a/libknet/tests/Makefile.am +++ b/libknet/tests/Makefile.am @@ -1,117 +1,121 @@ # # Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved. # # Authors: Fabio M. Di Nitto # # This software licensed under GPL-2.0+ # MAINTAINERCLEANFILES = Makefile.in include $(top_srcdir)/build-aux/check.mk include $(top_srcdir)/libknet/tests/api-check.mk EXTRA_DIST = \ api-test-coverage \ api-check.mk AM_CPPFLAGS = -I$(top_srcdir)/libknet AM_CFLAGS += $(PTHREAD_CFLAGS) $(libqb_CFLAGS) LIBS = $(top_builddir)/libknet/libknet.la \ $(PTHREAD_LIBS) $(dl_LIBS) noinst_HEADERS = \ test-common.h # the order of those tests is NOT random. # some functions can only be tested properly after some dependents # API have been validated upfront. check_PROGRAMS = \ $(api_checks) \ $(int_checks) \ $(fun_checks) int_checks = \ int_links_acl_ip_test \ int_timediff_test fun_checks = \ fun_config_crypto_test \ - fun_onwire_upgrade_test + fun_onwire_upgrade_test \ + fun_acl_check_test # checks below need to be executed manually # or with a specifi environment long_run_checks = \ fun_pmtud_crypto_test benchmarks = \ knet_bench_test noinst_PROGRAMS = \ api_knet_handle_new_limit_test \ pckt_test \ $(benchmarks) \ $(long_run_checks) \ $(check_PROGRAMS) noinst_SCRIPTS = \ api-test-coverage TESTS = $(check_PROGRAMS) if INSTALL_TESTS testsuitedir = $(TESTDIR) testsuite_PROGRAMS = $(noinst_PROGRAMS) endif check-local: check-api-test-coverage check-api-test-coverage: chmod u+x $(top_srcdir)/libknet/tests/api-test-coverage $(top_srcdir)/libknet/tests/api-test-coverage $(top_srcdir) $(top_builddir) pckt_test_SOURCES = pckt_test.c int_links_acl_ip_test_SOURCES = int_links_acl_ip.c \ ../common.c \ ../compat.c \ ../logging.c \ ../netutils.c \ ../threads_common.c \ ../onwire.c \ ../transports.c \ ../transport_common.c \ ../transport_loopback.c \ ../transport_sctp.c \ ../transport_udp.c \ ../links_acl.c \ ../links_acl_ip.c \ ../links_acl_loopback.c \ ../lib_config.c int_timediff_test_SOURCES = int_timediff.c knet_bench_test_SOURCES = knet_bench.c \ test-common.c \ ../common.c \ ../logging.c \ ../compat.c \ ../transport_common.c \ ../threads_common.c \ ../onwire.c \ ../lib_config.c fun_pmtud_crypto_test_SOURCES = fun_pmtud_crypto.c \ test-common.c \ ../onwire.c \ ../logging.c \ ../threads_common.c \ ../lib_config.c fun_config_crypto_test_SOURCES = fun_config_crypto.c \ test-common.c fun_onwire_upgrade_test_SOURCES = fun_onwire_upgrade.c \ test-common.c + +fun_acl_check_test_SOURCES = fun_acl_check.c \ + test-common.c diff --git a/libknet/tests/api_knet_link_add_acl.c b/libknet/tests/api_knet_link_add_acl.c index 4f6f8c91..759afdc2 100644 --- a/libknet/tests/api_knet_link_add_acl.c +++ b/libknet/tests/api_knet_link_add_acl.c @@ -1,246 +1,246 @@ /* * Copyright (C) 2019-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "libknet.h" #include "internals.h" #include "netutils.h" #include "test-common.h" static void test(void) { knet_handle_t knet_h; int logfds[2]; struct knet_host *host; struct knet_link *link; struct sockaddr_storage lo, lo6; if (make_local_sockaddr(&lo, 0) < 0) { printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno)); exit(FAIL); } if (make_local_sockaddr6(&lo6, 0) < 0) { printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno)); exit(FAIL); } printf("Test knet_link_add_acl incorrect knet_h\n"); if ((!knet_link_add_acl(NULL, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_add_acl accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno)); exit(FAIL); } setup_logpipes(logfds); knet_h = knet_handle_start(logfds, KNET_LOG_DEBUG); printf("Test knet_link_add_acl with unconfigured host\n"); if ((!knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_add_acl accepted unconfigured host or returned incorrect error: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_add_acl with unconfigured link\n"); if (knet_host_add(knet_h, 1) < 0) { printf("knet_host_add failed: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if ((!knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_add_acl accepted unconfigured link or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_add_acl with invalid link\n"); if ((!knet_link_add_acl(knet_h, 1, KNET_MAX_LINK, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_add_acl accepted invalid link or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_add_acl with invalid ss1\n"); if ((!knet_link_add_acl(knet_h, 1, 0, NULL, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_add_acl accepted invalid ss1 or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_add_acl with invalid ss2\n"); if ((!knet_link_add_acl(knet_h, 1, 0, &lo, NULL, CHECK_TYPE_RANGE, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_add_acl accepted invalid ss2 or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_add_acl with non matching families\n"); if ((!knet_link_add_acl(knet_h, 1, 0, &lo, &lo6, CHECK_TYPE_RANGE, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_add_acl accepted non matching families or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_add_acl with wrong check_type\n"); if ((!knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_RANGE + CHECK_TYPE_MASK + CHECK_TYPE_ADDRESS + 1, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_add_acl accepted incorrect check_type or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_add_acl with wrong acceptreject\n"); if ((!knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT + CHECK_REJECT + 1)) || (errno != EINVAL)) { printf("knet_link_add_acl accepted incorrect check_type or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_add_acl with point to point link\n"); if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 0, &lo) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if ((!knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_add_acl accepted point to point link or returned incorrect error: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); knet_link_clear_config(knet_h, 1, 0); printf("Test knet_link_add_acl with dynamic link\n"); if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 1, &lo) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } host = knet_h->host_index[1]; link = &host->link[0]; - if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) { + if (link->access_list_match_entry_head) { printf("match list not empty!"); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) { printf("knet_link_add_acl did not accept dynamic link error: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } - if (!knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) { + if (!link->access_list_match_entry_head) { printf("match list empty!"); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); } int main(int argc, char *argv[]) { test(); return PASS; } diff --git a/libknet/tests/api_knet_link_clear_acl.c b/libknet/tests/api_knet_link_clear_acl.c index abbf61bb..49cb84b1 100644 --- a/libknet/tests/api_knet_link_clear_acl.c +++ b/libknet/tests/api_knet_link_clear_acl.c @@ -1,191 +1,191 @@ /* * Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "libknet.h" #include "internals.h" #include "netutils.h" #include "test-common.h" static void test(void) { knet_handle_t knet_h; int logfds[2]; struct knet_host *host; struct knet_link *link; struct sockaddr_storage lo; printf("Test knet_link_clear_acl incorrect knet_h\n"); if ((!knet_link_clear_acl(NULL, 1, 0)) || (errno != EINVAL)) { printf("knet_link_clear_acl accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno)); exit(FAIL); } setup_logpipes(logfds); knet_h = knet_handle_start(logfds, KNET_LOG_DEBUG); printf("Test knet_link_clear_acl with unconfigured host\n"); if ((!knet_link_clear_acl(knet_h, 1, 0)) || (errno != EINVAL)) { printf("knet_link_clear_acl accepted unconfigured host or returned incorrect error: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_clear_acl with unconfigured link\n"); if (knet_host_add(knet_h, 1) < 0) { printf("knet_host_add failed: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if ((!knet_link_clear_acl(knet_h, 1, 0)) || (errno != EINVAL)) { printf("knet_link_clear_acl accepted unconfigured link or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_clear_acl with invalid link\n"); if ((!knet_link_clear_acl(knet_h, 1, KNET_MAX_LINK)) || (errno != EINVAL)) { printf("knet_link_clear_acl accepted invalid link or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_clear_acl with point to point link\n"); if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 0, &lo) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if ((!knet_link_clear_acl(knet_h, 1, 0)) || (errno != EINVAL)) { printf("knet_link_clear_acl accepted point ot point link or returned incorrect error: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); knet_link_clear_config(knet_h, 1, 0); printf("Test knet_link_clear_acl with dynamic link\n"); if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 1, &lo) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } host = knet_h->host_index[1]; link = &host->link[0]; - if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) { + if (link->access_list_match_entry_head) { printf("match list NOT empty!"); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) { printf("knet_link_clear_acl did not accept dynamic link error: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } - if (!knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) { + if (!link->access_list_match_entry_head) { printf("match list empty!"); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_link_clear_acl(knet_h, 1, 0) < 0) { printf("knet_link_clear_acl failed to clear. error: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } - if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) { + if (link->access_list_match_entry_head) { printf("match list NOT empty!"); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); } int main(int argc, char *argv[]) { test(); return PASS; } diff --git a/libknet/tests/api_knet_link_insert_acl.c b/libknet/tests/api_knet_link_insert_acl.c index c0a4de3e..a5de2378 100644 --- a/libknet/tests/api_knet_link_insert_acl.c +++ b/libknet/tests/api_knet_link_insert_acl.c @@ -1,246 +1,246 @@ /* * Copyright (C) 2019-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "libknet.h" #include "internals.h" #include "netutils.h" #include "test-common.h" static void test(void) { knet_handle_t knet_h; int logfds[2]; struct knet_host *host; struct knet_link *link; struct sockaddr_storage lo, lo6; if (make_local_sockaddr(&lo, 0) < 0) { printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno)); exit(FAIL); } if (make_local_sockaddr6(&lo6, 0) < 0) { printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno)); exit(FAIL); } printf("Test knet_link_insert_acl incorrect knet_h\n"); if ((!knet_link_insert_acl(NULL, 1, 0, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_insert_acl accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno)); exit(FAIL); } setup_logpipes(logfds); knet_h = knet_handle_start(logfds, KNET_LOG_DEBUG); printf("Test knet_link_insert_acl with unconfigured host\n"); if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_insert_acl accepted unconfigured host or returned incorrect error: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_insert_acl with unconfigured link\n"); if (knet_host_add(knet_h, 1) < 0) { printf("knet_host_add failed: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_insert_acl accepted unconfigured link or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_insert_acl with invalid link\n"); if ((!knet_link_insert_acl(knet_h, 1, KNET_MAX_LINK, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_insert_acl accepted invalid link or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_insert_acl with invalid ss1\n"); if ((!knet_link_insert_acl(knet_h, 1, 0, 0, NULL, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_insert_acl accepted invalid ss1 or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_insert_acl with invalid ss2\n"); if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, NULL, CHECK_TYPE_RANGE, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_insert_acl accepted invalid ss2 or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_insert_acl with non matching families\n"); if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo6, CHECK_TYPE_RANGE, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_insert_acl accepted non matching families or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_insert_acl with wrong check_type\n"); if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo, CHECK_TYPE_RANGE + CHECK_TYPE_MASK + CHECK_TYPE_ADDRESS + 1, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_insert_acl accepted incorrect check_type or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_insert_acl with wrong acceptreject\n"); if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT + CHECK_REJECT + 1)) || (errno != EINVAL)) { printf("knet_link_insert_acl accepted incorrect check_type or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_insert_acl with point to point link\n"); if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 0, &lo) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if ((!knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_insert_acl accepted point ot point link or returned incorrect error: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); knet_link_clear_config(knet_h, 1, 0); printf("Test knet_link_insert_acl with dynamic link\n"); if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 1, &lo) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } host = knet_h->host_index[1]; link = &host->link[0]; - if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) { + if (link->access_list_match_entry_head) { printf("match list not empty!"); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_link_insert_acl(knet_h, 1, 0, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) { printf("knet_link_insert_acl did not accept dynamic link error: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } - if (!knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) { + if (!link->access_list_match_entry_head) { printf("match list empty!"); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); } int main(int argc, char *argv[]) { test(); return PASS; } diff --git a/libknet/tests/api_knet_link_rm_acl.c b/libknet/tests/api_knet_link_rm_acl.c index b3300dd1..7af3fd0a 100644 --- a/libknet/tests/api_knet_link_rm_acl.c +++ b/libknet/tests/api_knet_link_rm_acl.c @@ -1,256 +1,256 @@ /* * Copyright (C) 2019-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "libknet.h" #include "internals.h" #include "netutils.h" #include "test-common.h" static void test(void) { knet_handle_t knet_h; int logfds[2]; struct knet_host *host; struct knet_link *link; struct sockaddr_storage lo, lo6; if (make_local_sockaddr(&lo, 0) < 0) { printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno)); exit(FAIL); } if (make_local_sockaddr6(&lo6, 0) < 0) { printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno)); exit(FAIL); } printf("Test knet_link_rm_acl incorrect knet_h\n"); if ((!knet_link_rm_acl(NULL, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_rm_acl accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno)); exit(FAIL); } setup_logpipes(logfds); knet_h = knet_handle_start(logfds, KNET_LOG_DEBUG); printf("Test knet_link_rm_acl with unconfigured host\n"); if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_rm_acl accepted unconfigured host or returned incorrect error: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_rm_acl with unconfigured link\n"); if (knet_host_add(knet_h, 1) < 0) { printf("knet_host_add failed: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_rm_acl accepted unconfigured link or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_rm_acl with invalid link\n"); if ((!knet_link_rm_acl(knet_h, 1, KNET_MAX_LINK, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_rm_acl accepted invalid link or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_rm_acl with invalid ss1\n"); if ((!knet_link_rm_acl(knet_h, 1, 0, NULL, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_rm_acl accepted invalid ss1 or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_rm_acl with invalid ss2\n"); if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, NULL, CHECK_TYPE_RANGE, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_rm_acl accepted invalid ss2 or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_rm_acl with non matching families\n"); if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, &lo6, CHECK_TYPE_RANGE, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_rm_acl accepted non matching families or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_rm_acl with wrong check_type\n"); if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_RANGE + CHECK_TYPE_MASK + CHECK_TYPE_ADDRESS + 1, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_rm_acl accepted incorrect check_type or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_rm_acl with wrong acceptreject\n"); if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT + CHECK_REJECT + 1)) || (errno != EINVAL)) { printf("knet_link_rm_acl accepted incorrect check_type or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_rm_acl with point to point link\n"); if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 0, &lo) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if ((!knet_link_rm_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)) || (errno != EINVAL)) { printf("knet_link_rm_acl accepted point ot point link or returned incorrect error: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); knet_link_clear_config(knet_h, 1, 0); printf("Test knet_link_rm_acl with dynamic link\n"); if (_knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, 0, AF_INET, 1, &lo) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } host = knet_h->host_index[1]; link = &host->link[0]; - if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) { + if (link->access_list_match_entry_head) { printf("match list not empty!"); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_link_add_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) { printf("Failed to add an access list: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_link_rm_acl(knet_h, 1, 0, &lo, &lo, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) { printf("knet_link_rm_acl did not accept dynamic link error: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } - if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) { + if (link->access_list_match_entry_head) { printf("match list NOT empty!"); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); } int main(int argc, char *argv[]) { test(); return PASS; } diff --git a/libknet/tests/api_knet_link_set_config.c b/libknet/tests/api_knet_link_set_config.c index 402b51c6..14025649 100644 --- a/libknet/tests/api_knet_link_set_config.c +++ b/libknet/tests/api_knet_link_set_config.c @@ -1,313 +1,313 @@ /* * Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "libknet.h" #include "internals.h" #include "links.h" #include "netutils.h" #include "test-common.h" static void test(void) { knet_handle_t knet_h; struct knet_host *host; struct knet_link *link; int logfds[2]; char lo_portstr[32]; struct sockaddr_storage lo, lo6; struct sockaddr_in *lo_in = (struct sockaddr_in *)&lo; struct knet_link_status link_status; if (make_local_sockaddr(&lo, -1) < 0) { printf("Unable to convert src to sockaddr: %s\n", strerror(errno)); exit(FAIL); } sprintf(lo_portstr, "%d", ntohs(lo_in->sin_port)); printf("Test knet_link_set_config incorrect knet_h\n"); if ((!knet_link_set_config(NULL, 1, 0, KNET_TRANSPORT_UDP, &lo, &lo, 0)) || (errno != EINVAL)) { printf("knet_link_set_config accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno)); exit(FAIL); } setup_logpipes(logfds); knet_h = knet_handle_start(logfds, KNET_LOG_DEBUG); printf("Test knet_link_set_config with unconfigured host_id\n"); if ((!knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &lo, &lo, 0)) || (errno != EINVAL)) { printf("knet_link_set_config accepted invalid host_id or returned incorrect error: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } printf("Test knet_link_set_config with bad transport type\n"); if ((!knet_link_set_config(knet_h, 1, 0, KNET_MAX_TRANSPORTS, &lo, &lo, 0)) || (errno != EINVAL)) { printf("knet_link_set_config accepted invalid transport or returned incorrect error: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_set_config with incorrect linkid\n"); if (knet_host_add(knet_h, 1) < 0) { printf("Unable to add host_id 1: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if ((!knet_link_set_config(knet_h, 1, KNET_MAX_LINK, KNET_TRANSPORT_UDP, &lo, &lo, 0)) || (errno != EINVAL)) { printf("knet_link_set_config accepted invalid linkid or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_set_config with incorrect src_addr\n"); if ((!knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, NULL, &lo, 0)) || (errno != EINVAL)) { printf("knet_link_set_config accepted invalid src_addr or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_set_config with conflicting address families\n"); if (make_local_sockaddr6(&lo6, -1) < 0) { printf("Unable to convert dst to sockaddr: %s\n", strerror(errno)); exit(FAIL); } if (knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &lo, &lo6, 0) == 0) { printf("knet_link_set_config accepted invalid address families: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_set_config with dynamic dst_addr\n"); if (knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &lo, NULL, 0) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } host = knet_h->host_index[1]; link = &host->link[0]; - if (knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) { + if (link->access_list_match_entry_head) { printf("found access lists for dynamic dst_addr!\n"); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_link_get_status(knet_h, 1, 0, &link_status, sizeof(struct knet_link_status)) < 0) { printf("Unable to get link status: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if ((link_status.enabled != 0) || (strcmp(link_status.src_ipaddr, "127.0.0.1")) || (strcmp(link_status.src_port, lo_portstr)) || (knet_h->host_index[1]->link[0].dynamic != KNET_LINK_DYNIP)) { printf("knet_link_set_config failed to set configuration. enabled: %d src_addr %s src_port %s dynamic %u\n", link_status.enabled, link_status.src_ipaddr, link_status.src_port, knet_h->host_index[1]->link[0].dynamic); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_link_set_config with dynamic link (0) and static link (1)\n"); if (!knet_link_set_config(knet_h, 1, 1, KNET_TRANSPORT_UDP, &lo, &lo, 0) || (errno != EINVAL)) { printf("knet_link_set_config accepted request mixed static/dynamic request or returned incorrect error: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } printf("Test knet_link_set_config with already configured link\n"); if ((!knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &lo, NULL, 0) || (errno != EBUSY))) { printf("knet_link_set_config accepted request while link configured or returned incorrect error: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } printf("Test knet_link_set_config with link enabled\n"); if (knet_link_set_enable(knet_h, 1, 0, 1) < 0) { printf("Unable to enable link: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_link_get_status(knet_h, 1, 0, &link_status, sizeof(struct knet_link_status)) < 0) { printf("Unable to get link status: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if ((!knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &lo, NULL, 0)) || (errno != EBUSY)) { printf("knet_link_set_config accepted request while link enabled or returned incorrect error: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_link_set_enable(knet_h, 1, 0, 0) < 0) { printf("Unable to disable link: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_link_clear_config(knet_h, 1, 0) < 0) { printf("Unable to clear link config: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } printf("Test knet_link_set_config with static dst_addr\n"); if (knet_link_set_config(knet_h, 1, 0, KNET_TRANSPORT_UDP, &lo, &lo, 0) < 0) { printf("Unable to configure link: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } host = knet_h->host_index[1]; link = &host->link[0]; - if (!knet_h->knet_transport_fd_tracker[link->outsock].access_list_match_entry_head) { + if (!link->access_list_match_entry_head) { printf("Unable to find default access lists for static dst_addr!\n"); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_link_get_status(knet_h, 1, 0, &link_status, sizeof(struct knet_link_status)) < 0) { printf("Unable to get link status: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if ((link_status.enabled != 0) || (strcmp(link_status.src_ipaddr, "127.0.0.1")) || (strcmp(link_status.src_port, lo_portstr)) || (strcmp(link_status.dst_ipaddr, "127.0.0.1")) || (strcmp(link_status.dst_port, lo_portstr)) || (knet_h->host_index[1]->link[0].dynamic != KNET_LINK_STATIC)) { printf("knet_link_set_config failed to set configuration. enabled: %d src_addr %s src_port %s dst_addr %s dst_port %s dynamic %u\n", link_status.enabled, link_status.src_ipaddr, link_status.src_port, link_status.dst_ipaddr, link_status.dst_port, knet_h->host_index[1]->link[0].dynamic); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); } int main(int argc, char *argv[]) { test(); return PASS; } diff --git a/libknet/tests/api_knet_send.c b/libknet/tests/api_knet_send.c index f4ffb3df..a3b5aa49 100644 --- a/libknet/tests/api_knet_send.c +++ b/libknet/tests/api_knet_send.c @@ -1,368 +1,376 @@ /* * Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include "libknet.h" #include "internals.h" #include "netutils.h" #include "test-common.h" static int private_data; static void sock_notify(void *pvt_data, int datafd, int8_t channel, uint8_t tx_rx, int error, int errorno) { return; } static void test(uint8_t transport) { knet_handle_t knet_h; int logfds[2]; int datafd = 0; int8_t channel = 0; struct knet_link_status link_status; char send_buff[KNET_MAX_PACKET_SIZE + 1]; char recv_buff[KNET_MAX_PACKET_SIZE]; ssize_t send_len = 0; int recv_len = 0; int savederrno; struct sockaddr_storage lo; memset(send_buff, 0, sizeof(send_buff)); printf("Test knet_send incorrect knet_h\n"); if ((!knet_send(NULL, send_buff, KNET_MAX_PACKET_SIZE, channel)) || (errno != EINVAL)) { printf("knet_send accepted invalid knet_h or returned incorrect error: %s\n", strerror(errno)); exit(FAIL); } setup_logpipes(logfds); knet_h = knet_handle_start(logfds, KNET_LOG_DEBUG); + if (knet_handle_enable_access_lists(knet_h, 1) < 0) { + printf("Cannot enable access lists: %s\n", strerror(errno)); + knet_handle_free(knet_h); + flush_logs(logfds[0], stdout); + close_logpipes(logfds); + exit(FAIL); + } + printf("Test knet_send with no send_buff\n"); if ((!knet_send(knet_h, NULL, KNET_MAX_PACKET_SIZE, channel)) || (errno != EINVAL)) { printf("knet_send accepted invalid send_buff or returned incorrect error: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_send with invalid send_buff len (0)\n"); if ((!knet_send(knet_h, send_buff, 0, channel)) || (errno != EINVAL)) { printf("knet_send accepted invalid send_buff len (0) or returned incorrect error: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_send with invalid send_buff len (> KNET_MAX_PACKET_SIZE)\n"); if ((!knet_send(knet_h, send_buff, KNET_MAX_PACKET_SIZE + 1, channel)) || (errno != EINVAL)) { printf("knet_send accepted invalid send_buff len (> KNET_MAX_PACKET_SIZE) or returned incorrect error: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_send with invalid channel (-1)\n"); channel = -1; if ((!knet_send(knet_h, send_buff, KNET_MAX_PACKET_SIZE, channel)) || (errno != EINVAL)) { printf("knet_send accepted invalid channel (-1) or returned incorrect error: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_send with invalid channel (KNET_DATAFD_MAX)\n"); channel = KNET_DATAFD_MAX; if ((!knet_send(knet_h, send_buff, KNET_MAX_PACKET_SIZE, channel)) || (errno != EINVAL)) { printf("knet_send accepted invalid channel (KNET_DATAFD_MAX) or returned incorrect error: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_send with unconfigured channel\n"); channel = 0; if ((!knet_send(knet_h, send_buff, KNET_MAX_PACKET_SIZE, channel)) || (errno != EINVAL)) { printf("knet_send accepted invalid unconfigured channel or returned incorrect error: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); printf("Test knet_send with valid data\n"); if (knet_handle_enable_access_lists(knet_h, 1) < 0) { printf("knet_handle_enable_access_lists failed: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_handle_enable_sock_notify(knet_h, &private_data, sock_notify) < 0) { printf("knet_handle_enable_sock_notify failed: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } datafd = 0; channel = -1; if (knet_handle_add_datafd(knet_h, &datafd, &channel) < 0) { printf("knet_handle_add_datafd failed: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_host_add(knet_h, 1) < 0) { printf("knet_host_add failed: %s\n", strerror(errno)); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (_knet_link_set_config(knet_h, 1, 0, transport, 0, AF_INET, 0, &lo) < 0 ) { int exit_status = transport == KNET_TRANSPORT_SCTP && errno == EPROTONOSUPPORT ? SKIP : FAIL; printf("Unable to configure link: %s\n", strerror(errno)); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(exit_status); } if (knet_link_set_enable(knet_h, 1, 0, 1) < 0) { printf("knet_link_set_enable failed: %s\n", strerror(errno)); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (knet_handle_setfwd(knet_h, 1) < 0) { printf("knet_handle_setfwd failed: %s\n", strerror(errno)); knet_link_set_enable(knet_h, 1, 0, 0); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (wait_for_host(knet_h, 1, 10, logfds[0], stdout) < 0) { printf("timeout waiting for host to be reachable\n"); knet_link_set_enable(knet_h, 1, 0, 0); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } send_len = knet_send(knet_h, send_buff, KNET_MAX_PACKET_SIZE, channel); if (send_len <= 0) { printf("knet_send failed: %s\n", strerror(errno)); knet_link_set_enable(knet_h, 1, 0, 0); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (send_len != sizeof(send_buff) - 1) { printf("knet_send sent only %zd bytes: %s\n", send_len, strerror(errno)); knet_link_set_enable(knet_h, 1, 0, 0); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } flush_logs(logfds[0], stdout); if (knet_handle_setfwd(knet_h, 0) < 0) { printf("knet_handle_setfwd failed: %s\n", strerror(errno)); knet_link_set_enable(knet_h, 1, 0, 0); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (wait_for_packet(knet_h, 10, datafd, logfds[0], stdout)) { printf("Error waiting for packet: %s\n", strerror(errno)); knet_link_set_enable(knet_h, 1, 0, 0); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } recv_len = knet_recv(knet_h, recv_buff, KNET_MAX_PACKET_SIZE, channel); savederrno = errno; if (recv_len != send_len) { printf("knet_recv received only %d bytes: %s (errno: %d)\n", recv_len, strerror(errno), errno); knet_link_set_enable(knet_h, 1, 0, 0); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); if ((is_helgrind()) && (recv_len == -1) && (savederrno == EAGAIN)) { printf("helgrind exception. this is normal due to possible timeouts\n"); exit(PASS); } exit(FAIL); } if (memcmp(recv_buff, send_buff, KNET_MAX_PACKET_SIZE)) { printf("recv and send buffers are different!\n"); knet_link_set_enable(knet_h, 1, 0, 0); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } /* A sanity check on the stats */ if (knet_link_get_status(knet_h, 1, 0, &link_status, sizeof(link_status)) < 0) { printf("knet_link_get_status failed: %s\n", strerror(errno)); knet_link_set_enable(knet_h, 1, 0, 0); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } if (link_status.stats.tx_data_packets != 2 || link_status.stats.rx_data_packets != 2 || link_status.stats.tx_data_bytes < KNET_MAX_PACKET_SIZE || link_status.stats.rx_data_bytes < KNET_MAX_PACKET_SIZE || link_status.stats.tx_data_bytes > KNET_MAX_PACKET_SIZE*2 || link_status.stats.rx_data_bytes > KNET_MAX_PACKET_SIZE*2) { printf("stats look wrong: tx_packets: %" PRIu64 " (%" PRIu64 " bytes), rx_packets: %" PRIu64 " (%" PRIu64 " bytes)\n", link_status.stats.tx_data_packets, link_status.stats.tx_data_bytes, link_status.stats.rx_data_packets, link_status.stats.rx_data_bytes); } flush_logs(logfds[0], stdout); if (knet_handle_setfwd(knet_h, 1) < 0) { printf("knet_handle_setfwd failed: %s\n", strerror(errno)); knet_link_set_enable(knet_h, 1, 0, 0); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } printf("try to send big packet to local datafd (bypass knet_send)\n"); if (write(datafd, &send_buff, sizeof(send_buff)) != KNET_MAX_PACKET_SIZE + 1) { printf("Error writing to datafd: %s\n", strerror(errno)); } if (!wait_for_packet(knet_h, 2, datafd, logfds[0], stdout)) { printf("Received unexpected packet!\n"); knet_link_set_enable(knet_h, 1, 0, 0); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } knet_link_set_enable(knet_h, 1, 0, 0); knet_link_clear_config(knet_h, 1, 0); knet_host_remove(knet_h, 1); knet_handle_free(knet_h); flush_logs(logfds[0], stdout); close_logpipes(logfds); } int main(int argc, char *argv[]) { printf("Testing with UDP\n"); test(KNET_TRANSPORT_UDP); #ifdef HAVE_NETINET_SCTP_H printf("Testing with SCTP\n"); test(KNET_TRANSPORT_SCTP); #endif return PASS; } diff --git a/libknet/tests/fun_acl_check.c b/libknet/tests/fun_acl_check.c new file mode 100644 index 00000000..55cf09bb --- /dev/null +++ b/libknet/tests/fun_acl_check.c @@ -0,0 +1,409 @@ +/* + * Copyright (C) 2021 Red Hat, Inc. All rights reserved. + * + * Authors: Christine Caulfield + * + * This software licensed under GPL-2.0+ + */ + +#include "config.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "libknet.h" + +#include "internals.h" +#include "netutils.h" +#include "test-common.h" + + +/* + * Keep track of how many messages got through: + * clean + 3xACLs + QUIT + */ +#define CORRECT_NUM_MSGS 5 +static int msgs_recvd = 0; + +#define TESTNODES 2 + +static pthread_mutex_t recv_mutex = PTHREAD_MUTEX_INITIALIZER; +static int quit_recv_thread = 0; + +static int reply_pipe[2]; + +#define FAIL_ON_ERR(fn) \ + printf("FOE: %s\n", #fn); \ + if ((res = fn) != 0) { \ + int savederrno = errno; \ + pthread_mutex_lock(&recv_mutex); \ + quit_recv_thread = 1; \ + pthread_mutex_unlock(&recv_mutex); \ + if (recv_thread) { \ + pthread_join(recv_thread, (void**)&thread_err); \ + } \ + knet_handle_stop_nodes(knet_h, TESTNODES); \ + stop_logthread(); \ + flush_logs(logfds[0], stdout); \ + close_logpipes(logfds); \ + close(reply_pipe[0]); \ + close(reply_pipe[1]); \ + if (res == -2) { \ + exit(SKIP); \ + } else { \ + printf("*** FAIL on line %d %s failed: %s\n", __LINE__ , #fn, strerror(savederrno)); \ + exit(FAIL); \ + } \ + } + +static int knet_send_str(knet_handle_t knet_h, char *str) +{ + return knet_send_sync(knet_h, str, strlen(str)+1, 0); +} + +/* + * lo0 is filled in with the local address on return. + * lo1 is expected to be provided - it's the actual remote address to connect to. + */ +int dyn_knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, + uint8_t transport, uint64_t flags, int family, int dynamic, + struct sockaddr_storage *lo0, struct sockaddr_storage *lo1) +{ + int err = 0, savederrno = 0; + uint32_t port; + char portstr[32]; + + for (port = 1025; port < 65536; port++) { + sprintf(portstr, "%u", port); + memset(lo0, 0, sizeof(struct sockaddr_storage)); + if (family == AF_INET6) { + err = knet_strtoaddr("::1", portstr, lo0, sizeof(struct sockaddr_storage)); + } else { + err = knet_strtoaddr("127.0.0.1", portstr, lo0, sizeof(struct sockaddr_storage)); + } + if (err < 0) { + printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno)); + goto out; + } + errno = 0; + if (dynamic) { + err = knet_link_set_config(knet_h, host_id, link_id, transport, lo0, NULL, flags); + } else { + err = knet_link_set_config(knet_h, host_id, link_id, transport, lo0, lo1, flags); + } + savederrno = errno; + if ((err < 0) && (savederrno != EADDRINUSE)) { + if (savederrno == EPROTONOSUPPORT && transport == KNET_TRANSPORT_SCTP) { + return -2; + } else { + printf("Unable to configure link: %s\n", strerror(savederrno)); + goto out; + } + } + if (!err) { + printf("Using port %u\n", port); + goto out; + } + } + + if (err) { + printf("No more ports available\n"); + } +out: + errno = savederrno; + return err; +} + +static void *recv_messages(void *handle) +{ + knet_handle_t knet_h = (knet_handle_t)handle; + char buf[4096]; + ssize_t len; + static int err = 0; + int savederrno = 0, quit = 0; + + while ((len = knet_recv(knet_h, buf, sizeof(buf), 0)) && (!quit)) { + savederrno = errno; + pthread_mutex_lock(&recv_mutex); + quit = quit_recv_thread; + pthread_mutex_unlock(&recv_mutex); + if (quit) { + printf(" *** recv thread was requested to exit via FOE\n"); + err = 1; + return &err; + } + if (len > 0) { + int res; + + printf("recv: (%ld) %s\n", (long)len, buf); + msgs_recvd++; + if (strcmp("QUIT", buf) == 0) { + break; + } + if (buf[0] == '0') { /* We should not have received this! */ + printf(" *** FAIL received packet that should have been blocked\n"); + err = 1; + return &err; + } + /* Tell the main thread we have received something */ + res = write(reply_pipe[1], ".", 1); + if (res != 1) { + printf(" *** FAIL to send response back to main thread\n"); + err = 1; + return &err; + } + } + usleep(1000); + if (len < 0 && savederrno != EAGAIN) { + break; + } + } + printf("-- recv thread finished: %zd %d %s\n", len, errno, strerror(savederrno)); + return &err; +} + +static void notify_fn(void *private_data, + int datafd, + int8_t channel, + uint8_t tx_rx, + int error, + int errorno) +{ + printf("NOTIFY fn called\n"); +} + +/* A VERY basic filter because all data traffic is going to one place */ +static int dhost_filter(void *pvt_data, + const unsigned char *outdata, + ssize_t outdata_len, + uint8_t tx_rx, + knet_node_id_t this_host_id, + knet_node_id_t src_host_id, + int8_t *dst_channel, + knet_node_id_t *dst_host_ids, + size_t *dst_host_ids_entries) +{ + dst_host_ids[0] = 1; + *dst_host_ids_entries = 1; + return 0; +} + +/* This used to be a pthread condition variable, but + there was a race where it could be triggered before + the main thread was waiting for it. + Go old-fashioned. +*/ +static int wait_for_reply(int seconds) +{ + int res; + struct pollfd pfds; + char tmpbuf[32]; + + pfds.fd = reply_pipe[0]; + pfds.events = POLLIN | POLLERR | POLLHUP; + pfds.revents = 0; + + res = poll(&pfds, 1, seconds*1000); + if (res == 1) { + if (pfds.revents & POLLIN) { + res = read(reply_pipe[0], tmpbuf, sizeof(tmpbuf)); + if (res > 0) { + return 0; + } + } else { + printf("Error on pipe poll revent = 0x%x\n", pfds.revents); + errno = EIO; + } + } + if (res == 0) { + errno = ETIMEDOUT; + return -1; + } + + return -1; +} + +static void test(int transport) +{ + knet_handle_t knet_h[TESTNODES+1]; + int logfds[2]; + struct sockaddr_storage lo0, lo1; + struct sockaddr_storage ss1, ss2; + int res; + pthread_t recv_thread = 0; + int *thread_err; + int datafd; + int8_t channel; + int seconds = 90; // dynamic tests take longer than normal tests + + if (is_memcheck() || is_helgrind()) { + printf("Test suite is running under valgrind, adjusting wait_for_host timeout\n"); + seconds = seconds * 16; + } + + memset(knet_h, 0, sizeof(knet_h)); + memset(reply_pipe, 0, sizeof(reply_pipe)); + memset(logfds, 0, sizeof(logfds)); + + FAIL_ON_ERR(pipe(reply_pipe)); + + // Initial setup gubbins + msgs_recvd = 0; + setup_logpipes(logfds); + start_logthread(logfds[1], stdout); + knet_handle_start_nodes(knet_h, TESTNODES, logfds, KNET_LOG_DEBUG); + + FAIL_ON_ERR(knet_host_add(knet_h[2], 1)); + FAIL_ON_ERR(knet_host_add(knet_h[1], 2)); + + FAIL_ON_ERR(knet_handle_enable_filter(knet_h[2], NULL, dhost_filter)); + + // Create the dynamic (receiving) link + FAIL_ON_ERR(dyn_knet_link_set_config(knet_h[1], 2, 0, transport, 0, AF_INET, 1, &lo0, NULL)); + + // Connect to the dynamic link + FAIL_ON_ERR(dyn_knet_link_set_config(knet_h[2], 1, 0, transport, 0, AF_INET, 0, &lo1, &lo0)); + + // All the rest of the setup gubbins + FAIL_ON_ERR(knet_handle_enable_sock_notify(knet_h[1], 0, ¬ify_fn)); + FAIL_ON_ERR(knet_handle_enable_sock_notify(knet_h[2], 0, ¬ify_fn)); + + channel = datafd = 0; + FAIL_ON_ERR(knet_handle_add_datafd(knet_h[1], &datafd, &channel)); + channel = datafd = 0; + FAIL_ON_ERR(knet_handle_add_datafd(knet_h[2], &datafd, &channel)); + + FAIL_ON_ERR(knet_link_set_enable(knet_h[1], 2, 0, 1)); + FAIL_ON_ERR(knet_link_set_enable(knet_h[2], 1, 0, 1)); + + FAIL_ON_ERR(knet_handle_setfwd(knet_h[1], 1)); + FAIL_ON_ERR(knet_handle_setfwd(knet_h[2], 1)); + + // Start receive thread + FAIL_ON_ERR(pthread_create(&recv_thread, NULL, recv_messages, (void *)knet_h[1])); + + // Let everything settle down + FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout)); + FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout)); + + /* + * TESTING STARTS HERE + * strings starting '1' should reach the receiving thread + * strings starting '0' should not + */ + + // No ACL + printf("Testing No ACL - this should get through\n"); + FAIL_ON_ERR(knet_send_str(knet_h[2], "1No ACL - this should get through")); + FAIL_ON_ERR(wait_for_reply(seconds)) + + // Block traffic from this address. + memset(&ss1, 0, sizeof(ss1)); + memset(&ss2, 0, sizeof(ss1)); + knet_strtoaddr("127.0.0.1","0", &ss1, sizeof(ss1)); + FAIL_ON_ERR(knet_link_add_acl(knet_h[1], 2, 0, &ss1, NULL, CHECK_TYPE_ADDRESS, CHECK_REJECT)); + // Accept ACL for when we remove them + FAIL_ON_ERR(knet_link_add_acl(knet_h[1], 2, 0, &ss1, NULL, CHECK_TYPE_ADDRESS, CHECK_ACCEPT)); + + // This needs to go after the first ACLs are added + FAIL_ON_ERR(knet_handle_enable_access_lists(knet_h[1], 1)); + + printf("Testing Address blocked - this should NOT get through\n"); + FAIL_ON_ERR(knet_send_str(knet_h[2], "0Address blocked - this should NOT get through")); + + // Unblock and check again + FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 0, seconds, logfds[0], stdout)); + FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 0, seconds, logfds[0], stdout)); + FAIL_ON_ERR(knet_link_rm_acl(knet_h[1], 2, 0, &ss1, NULL, CHECK_TYPE_ADDRESS, CHECK_REJECT)); + FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout)); + FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout)); + + printf("Testing Address unblocked - this should get through\n"); + FAIL_ON_ERR(knet_send_str(knet_h[2], "1Address unblocked - this should get through")); + FAIL_ON_ERR(wait_for_reply(seconds)); + + // Block traffic using a netmask + knet_strtoaddr("127.0.0.1","0", &ss1, sizeof(ss1)); + knet_strtoaddr("255.0.0.1","0", &ss2, sizeof(ss2)); + FAIL_ON_ERR(knet_link_insert_acl(knet_h[1], 2, 0, 0, &ss1, &ss2, CHECK_TYPE_MASK, CHECK_REJECT)); + + printf("Testing Netmask blocked - this should NOT get through\n"); + FAIL_ON_ERR(knet_send_str(knet_h[2], "0Netmask blocked - this should NOT get through")); + + // Unblock and check again + FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 0, seconds, logfds[0], stdout)); + FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 0, seconds, logfds[0], stdout)); + FAIL_ON_ERR(knet_link_rm_acl(knet_h[1], 2, 0, &ss1, &ss2, CHECK_TYPE_MASK, CHECK_REJECT)); + FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout)); + FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout)); + + printf("Testing Netmask unblocked - this should get through\n"); + FAIL_ON_ERR(knet_send_str(knet_h[2], "1Netmask unblocked - this should get through")); + FAIL_ON_ERR(wait_for_reply(seconds)); + + // Block traffic from a range + knet_strtoaddr("127.0.0.0", "0", &ss1, sizeof(ss1)); + knet_strtoaddr("127.0.0.9", "0", &ss2, sizeof(ss2)); + FAIL_ON_ERR(knet_link_insert_acl(knet_h[1], 2, 0, 0, &ss1, &ss2, CHECK_TYPE_RANGE, CHECK_REJECT)); + + printf("Testing Range blocked - this should NOT get through\n"); + FAIL_ON_ERR(knet_send_str(knet_h[2], "0Range blocked - this should NOT get through")); + + // Unblock and check again + FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 0, seconds, logfds[0], stdout)); + FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 0, seconds, logfds[0], stdout)); + FAIL_ON_ERR(knet_link_rm_acl(knet_h[1], 2, 0, &ss1, &ss2, CHECK_TYPE_RANGE, CHECK_REJECT)); + FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout)); + FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout)); + + printf("Testing Range unblocked - this should get through\n"); + FAIL_ON_ERR(knet_send_str(knet_h[2], "1Range unblocked - this should get through")); + FAIL_ON_ERR(wait_for_reply(seconds)); + + // Finish up - disable ACLS to make sure the QUIT message gets through + FAIL_ON_ERR(knet_handle_enable_access_lists(knet_h[1], 0)); + FAIL_ON_ERR(wait_for_nodes_state(knet_h[1], TESTNODES, 1, seconds, logfds[0], stdout)); + FAIL_ON_ERR(wait_for_nodes_state(knet_h[2], TESTNODES, 1, seconds, logfds[0], stdout)); + + FAIL_ON_ERR(knet_send_str(knet_h[2], "QUIT")); + + // Check return from the receiving thread + pthread_join(recv_thread, (void**)&thread_err); + if (*thread_err) { + printf("Thread returned %d\n", *thread_err); + exit(FAIL); + } + + // Tidy Up + knet_handle_stop_nodes(knet_h, TESTNODES); + + stop_logthread(); + flush_logs(logfds[0], stdout); + close_logpipes(logfds); + close(reply_pipe[0]); + close(reply_pipe[1]); + + if (msgs_recvd != CORRECT_NUM_MSGS) { + printf("*** FAIL Recv thread got %d messages, expected %d\n", msgs_recvd, CORRECT_NUM_MSGS); + exit(FAIL); + } +} + +int main(int argc, char *argv[]) +{ + printf("Testing with UDP\n"); + test(KNET_TRANSPORT_UDP); + +#ifdef HAVE_NETINET_SCTP_H + printf("Testing with SCTP currently disabled\n"); + //test(KNET_TRANSPORT_SCTP); +#endif + + return PASS; +} diff --git a/libknet/tests/test-common.c b/libknet/tests/test-common.c index 4e0c827f..335a22fe 100644 --- a/libknet/tests/test-common.c +++ b/libknet/tests/test-common.c @@ -1,969 +1,980 @@ /* * Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved. * * Author: Fabio M. Di Nitto * * This software licensed under GPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include #include #include #include "libknet.h" #include "test-common.h" static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; static int log_init = 0; static pthread_mutex_t log_thread_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_t log_thread; static int log_thread_init = 0; static int log_fds[2]; struct log_thread_data { int logfd; FILE *std; }; static struct log_thread_data data; static char plugin_path[PATH_MAX]; static int _read_pipe(int fd, char **file, size_t *length) { char buf[4096]; int n; int done = 0; *file = NULL; *length = 0; memset(buf, 0, sizeof(buf)); while (!done) { n = read(fd, buf, sizeof(buf)); if (n < 0) { if (errno == EINTR) continue; if (*file) free(*file); return n; } if (n == 0 && (!*length)) return 0; if (n == 0) done = 1; if (*file) *file = realloc(*file, (*length) + n + done); else *file = malloc(n + done); if (!*file) return -1; memmove((*file) + (*length), buf, n); *length += (done + n); } /* Null terminator */ (*file)[(*length) - 1] = 0; return 0; } int execute_shell(const char *command, char **error_string) { pid_t pid; int status, err = 0; int fd[2]; size_t size = 0; if ((command == NULL) || (!error_string)) { errno = EINVAL; return FAIL; } *error_string = NULL; err = pipe(fd); if (err) goto out_clean; pid = fork(); if (pid < 0) { err = pid; goto out_clean; } if (pid) { /* parent */ close(fd[1]); err = _read_pipe(fd[0], error_string, &size); if (err) goto out_clean0; waitpid(pid, &status, 0); if (!WIFEXITED(status)) { err = -1; goto out_clean0; } if (WIFEXITED(status) && WEXITSTATUS(status) != 0) { err = WEXITSTATUS(status); goto out_clean0; } goto out_clean0; } else { /* child */ close(0); close(1); close(2); close(fd[0]); dup2(fd[1], 1); dup2(fd[1], 2); close(fd[1]); execlp("/bin/sh", "/bin/sh", "-c", command, NULL); exit(FAIL); } out_clean: close(fd[1]); out_clean0: close(fd[0]); return err; } int is_memcheck(void) { char *val; val = getenv("KNETMEMCHECK"); if (val) { if (!strncmp(val, "yes", 3)) { return 1; } } return 0; } int is_helgrind(void) { char *val; val = getenv("KNETHELGRIND"); if (val) { if (!strncmp(val, "yes", 3)) { return 1; } } return 0; } void set_scheduler(int policy) { struct sched_param sched_param; int err; err = sched_get_priority_max(policy); if (err < 0) { printf("Could not get maximum scheduler priority\n"); exit(FAIL); } sched_param.sched_priority = err; err = sched_setscheduler(0, policy, &sched_param); if (err < 0) { printf("Could not set priority\n"); exit(FAIL); } return; } int setup_logpipes(int *logfds) { if (pipe2(logfds, O_CLOEXEC | O_NONBLOCK) < 0) { printf("Unable to setup logging pipe\n"); exit(FAIL); } return PASS; } void close_logpipes(int *logfds) { close(logfds[0]); logfds[0] = 0; close(logfds[1]); logfds[1] = 0; } void flush_logs(int logfd, FILE *std) { struct knet_log_msg msg; int len; while (1) { len = read(logfd, &msg, sizeof(msg)); if (len != sizeof(msg)) { /* * clear errno to avoid incorrect propagation */ errno = 0; return; } if (!msg.knet_h) { /* * this is harsh but this function is void * and it is used also inside log_thread. * this is the easiest to get out with an error */ fprintf(std, "NO HANDLE INFO IN LOG MSG!!\n"); abort(); } msg.msg[sizeof(msg.msg) - 1] = 0; fprintf(std, "[knet: %p]: [%s] %s: %.*s\n", msg.knet_h, knet_log_get_loglevel_name(msg.msglevel), knet_log_get_subsystem_name(msg.subsystem), KNET_MAX_LOG_MSG_SIZE, msg.msg); } } static void *_logthread(void *args) { while (1) { int num; struct timeval tv = { 60, 0 }; fd_set rfds; FD_ZERO(&rfds); FD_SET(data.logfd, &rfds); num = select(FD_SETSIZE, &rfds, NULL, NULL, &tv); if (num < 0) { fprintf(data.std, "Unable select over logfd!\nHALTING LOGTHREAD!\n"); return NULL; } if (num == 0) { fprintf(data.std, "[knet]: No logs in the last 60 seconds\n"); continue; } if (FD_ISSET(data.logfd, &rfds)) { flush_logs(data.logfd, data.std); } } } int start_logthread(int logfd, FILE *std) { int savederrno = 0; savederrno = pthread_mutex_lock(&log_thread_mutex); if (savederrno) { printf("Unable to get log_thread mutex lock\n"); return -1; } if (!log_thread_init) { data.logfd = logfd; data.std = std; savederrno = pthread_create(&log_thread, 0, _logthread, NULL); if (savederrno) { printf("Unable to start logging thread: %s\n", strerror(savederrno)); pthread_mutex_unlock(&log_thread_mutex); return -1; } log_thread_init = 1; } pthread_mutex_unlock(&log_thread_mutex); return 0; } int stop_logthread(void) { int savederrno = 0; void *retval; savederrno = pthread_mutex_lock(&log_thread_mutex); if (savederrno) { printf("Unable to get log_thread mutex lock\n"); return -1; } if (log_thread_init) { pthread_cancel(log_thread); pthread_join(log_thread, &retval); log_thread_init = 0; } pthread_mutex_unlock(&log_thread_mutex); return 0; } static void stop_logging(void) { stop_logthread(); flush_logs(log_fds[0], stdout); close_logpipes(log_fds); } int start_logging(FILE *std) { int savederrno = 0; savederrno = pthread_mutex_lock(&log_mutex); if (savederrno) { printf("Unable to get log_mutex lock\n"); return -1; } if (!log_init) { setup_logpipes(log_fds); if (atexit(&stop_logging) != 0) { printf("Unable to register atexit handler to stop logging: %s\n", strerror(errno)); exit(FAIL); } if (start_logthread(log_fds[0], std) < 0) { exit(FAIL); } log_init = 1; } pthread_mutex_unlock(&log_mutex); return log_fds[1]; } static int dir_filter(const struct dirent *dname) { if ( (strcmp(dname->d_name + strlen(dname->d_name)-3, ".so") == 0) && ((strncmp(dname->d_name,"crypto", 6) == 0) || (strncmp(dname->d_name,"compress", 8) == 0))) { return 1; } return 0; } /* Make sure the proposed plugin path has at least 1 of each plugin available - just as a sanity check really */ static int contains_plugins(char *path) { struct dirent **namelist; int n,i; size_t j; struct knet_compress_info compress_list[256]; struct knet_crypto_info crypto_list[256]; size_t num_compress, num_crypto; size_t compress_found = 0; size_t crypto_found = 0; if (knet_get_compress_list(compress_list, &num_compress) == -1) { return 0; } if (knet_get_crypto_list(crypto_list, &num_crypto) == -1) { return 0; } n = scandir(path, &namelist, dir_filter, alphasort); if (n == -1) { return 0; } /* Look for plugins in the list */ for (i=0; id_name) >= 7 && strncmp(crypto_list[j].name, namelist[i]->d_name+7, strlen(crypto_list[j].name)) == 0) { crypto_found++; } } for (j=0; jd_name) >= 9 && strncmp(compress_list[j].name, namelist[i]->d_name+9, strlen(compress_list[j].name)) == 0) { compress_found++; } } free(namelist[i]); } free(namelist); /* If at least one plugin was found (or none were built) */ if ((crypto_found || num_crypto == 0) && (compress_found || num_compress == 0)) { return 1; } else { return 0; } } /* libtool sets LD_LIBRARY_PATH to the build tree when running test in-tree */ char *find_plugins_path(void) { char *ld_libs_env = getenv("LD_LIBRARY_PATH"); if (ld_libs_env) { char *ld_libs = strdup(ld_libs_env); char *str = strtok(ld_libs, ":"); while (str) { if (contains_plugins(str)) { strncpy(plugin_path, str, sizeof(plugin_path)-1); free(ld_libs); printf("Using plugins from %s\n", plugin_path); return plugin_path; } str = strtok(NULL, ":"); } free(ld_libs); } return NULL; } knet_handle_t knet_handle_start(int logfds[2], uint8_t log_level) { knet_handle_t knet_h = knet_handle_new(1, logfds[1], log_level, 0); char *plugins_path; if (knet_h) { printf("knet_handle_new at %p\n", knet_h); plugins_path = find_plugins_path(); /* Use plugins from the build tree */ if (plugins_path) { knet_h->plugin_path = plugins_path; } return knet_h; } else { printf("knet_handle_new failed: %s\n", strerror(errno)); flush_logs(logfds[0], stdout); close_logpipes(logfds); exit(FAIL); } } int knet_handle_reconnect_links(knet_handle_t knet_h) { size_t i, j; knet_node_id_t host_ids[KNET_MAX_HOST]; uint8_t link_ids[KNET_MAX_LINK]; size_t host_ids_entries = 0, link_ids_entries = 0; unsigned int enabled; if (!knet_h) { errno = EINVAL; return -1; } if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) { printf("knet_host_get_host_list failed: %s\n", strerror(errno)); return -1; } for (i = 0; i < host_ids_entries; i++) { if (knet_link_get_link_list(knet_h, host_ids[i], link_ids, &link_ids_entries)) { printf("knet_link_get_link_list failed: %s\n", strerror(errno)); return -1; } for (j = 0; j < link_ids_entries; j++) { if (knet_link_get_enable(knet_h, host_ids[i], link_ids[j], &enabled)) { printf("knet_link_get_enable failed: %s\n", strerror(errno)); return -1; } if (!enabled) { if (knet_link_set_enable(knet_h, host_ids[i], j, 1)) { printf("knet_link_set_enable failed: %s\n", strerror(errno)); return -1; } } } } return 0; } int knet_handle_disconnect_links(knet_handle_t knet_h) { size_t i, j; knet_node_id_t host_ids[KNET_MAX_HOST]; uint8_t link_ids[KNET_MAX_LINK]; size_t host_ids_entries = 0, link_ids_entries = 0; unsigned int enabled; if (!knet_h) { errno = EINVAL; return -1; } if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) { printf("knet_host_get_host_list failed: %s\n", strerror(errno)); return -1; } for (i = 0; i < host_ids_entries; i++) { if (knet_link_get_link_list(knet_h, host_ids[i], link_ids, &link_ids_entries)) { printf("knet_link_get_link_list failed: %s\n", strerror(errno)); return -1; } for (j = 0; j < link_ids_entries; j++) { if (knet_link_get_enable(knet_h, host_ids[i], link_ids[j], &enabled)) { printf("knet_link_get_enable failed: %s\n", strerror(errno)); return -1; } if (enabled) { if (knet_link_set_enable(knet_h, host_ids[i], j, 0)) { printf("knet_link_set_enable failed: %s\n", strerror(errno)); return -1; } } } } return 0; } int knet_handle_stop(knet_handle_t knet_h) { size_t i, j; knet_node_id_t host_ids[KNET_MAX_HOST]; uint8_t link_ids[KNET_MAX_LINK]; size_t host_ids_entries = 0, link_ids_entries = 0; unsigned int enabled; if (!knet_h) { errno = EINVAL; return -1; } if (knet_handle_setfwd(knet_h, 0) < 0) { printf("knet_handle_setfwd failed: %s\n", strerror(errno)); return -1; } if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) { printf("knet_host_get_host_list failed: %s\n", strerror(errno)); return -1; } for (i = 0; i < host_ids_entries; i++) { if (knet_link_get_link_list(knet_h, host_ids[i], link_ids, &link_ids_entries)) { printf("knet_link_get_link_list failed: %s\n", strerror(errno)); return -1; } for (j = 0; j < link_ids_entries; j++) { if (knet_link_get_enable(knet_h, host_ids[i], link_ids[j], &enabled)) { printf("knet_link_get_enable failed: %s\n", strerror(errno)); return -1; } if (enabled) { if (knet_link_set_enable(knet_h, host_ids[i], j, 0)) { printf("knet_link_set_enable failed: %s\n", strerror(errno)); return -1; } } printf("clearing config for: %p host: %u link: %zu\n", knet_h, host_ids[i], j); knet_link_clear_config(knet_h, host_ids[i], j); } if (knet_host_remove(knet_h, host_ids[i]) < 0) { printf("knet_host_remove failed: %s\n", strerror(errno)); return -1; } } if (knet_handle_free(knet_h)) { printf("knet_handle_free failed: %s\n", strerror(errno)); return -1; } return 0; } static int _make_local_sockaddr(struct sockaddr_storage *lo, int offset, int family) { in_port_t port; char portstr[32]; if (offset < 0) { /* * api_knet_link_set_config needs to access the API directly, but * it does not send any traffic, so it´s safe to ask the kernel * for a random port. */ port = 0; } else { /* Use the pid if we can. but makes sure its in a sensible range */ port = (getpid() + offset) % (65536-1024) + 1024; } sprintf(portstr, "%u", port); memset(lo, 0, sizeof(struct sockaddr_storage)); printf("Using port %u\n", port); if (family == AF_INET6) { return knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage)); } return knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage)); } int make_local_sockaddr(struct sockaddr_storage *lo, int offset) { return _make_local_sockaddr(lo, offset, AF_INET); } int make_local_sockaddr6(struct sockaddr_storage *lo, int offset) { return _make_local_sockaddr(lo, offset, AF_INET6); } int _knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id, uint8_t transport, uint64_t flags, int family, int dynamic, struct sockaddr_storage *lo) { int err = 0, savederrno = 0; uint32_t port; char portstr[32]; for (port = 1025; port < 65536; port++) { sprintf(portstr, "%u", port); memset(lo, 0, sizeof(struct sockaddr_storage)); if (family == AF_INET6) { err = knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage)); } else { err = knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage)); } if (err < 0) { printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno)); goto out; } errno = 0; if (dynamic) { err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, NULL, flags); } else { err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, lo, flags); } savederrno = errno; if ((err < 0) && (savederrno != EADDRINUSE)) { printf("Unable to configure link: %s\n", strerror(savederrno)); goto out; } if (!err) { printf("Using port %u\n", port); goto out; } } if (err) { printf("No more ports available\n"); } out: errno = savederrno; return err; } void test_sleep(knet_handle_t knet_h, int seconds) { if (is_memcheck() || is_helgrind()) { printf("Test suite is running under valgrind, adjusting sleep timers\n"); seconds = seconds * 16; } sleep(seconds); } int wait_for_packet(knet_handle_t knet_h, int seconds, int datafd, int logfd, FILE *std) { fd_set rfds; struct timeval tv; int err = 0, i = 0; if (is_memcheck() || is_helgrind()) { printf("Test suite is running under valgrind, adjusting wait_for_packet timeout\n"); seconds = seconds * 16; } try_again: FD_ZERO(&rfds); FD_SET(datafd, &rfds); tv.tv_sec = 1; tv.tv_usec = 0; err = select(datafd+1, &rfds, NULL, NULL, &tv); /* * on slow arches the first call to select can return 0. * pick an arbitrary 10 times loop (multiplied by waiting seconds) * before failing. */ if ((!err) && (i < seconds)) { flush_logs(logfd, std); i++; goto try_again; } if ((err > 0) && (FD_ISSET(datafd, &rfds))) { return 0; } errno = ETIMEDOUT; return -1; } /* * functional tests helpers */ void knet_handle_start_nodes(knet_handle_t knet_h[], uint8_t numnodes, int logfds[2], uint8_t log_level) { uint8_t i; char *plugins_path = find_plugins_path(); for (i = 1; i <= numnodes; i++) { knet_h[i] = knet_handle_new(i, logfds[1], log_level, 0); if (!knet_h[i]) { printf("failed to create handle: %s\n", strerror(errno)); break; } else { printf("knet_h[%u] at %p\n", i, knet_h[i]); } /* Use plugins from the build tree */ if (plugins_path) { knet_h[i]->plugin_path = plugins_path; } } if (i < numnodes) { knet_handle_stop_nodes(knet_h, i); exit(FAIL); } return; } void knet_handle_stop_nodes(knet_handle_t knet_h[], uint8_t numnodes) { uint8_t i; for (i = 1; i <= numnodes; i++) { - printf("stopping handle %u at %p\n", i, knet_h[i]); - knet_handle_stop(knet_h[i]); + if (knet_h[i]) { + printf("stopping handle %u at %p\n", i, knet_h[i]); + knet_handle_stop(knet_h[i]); + } } return; } void knet_handle_join_nodes(knet_handle_t knet_h[], uint8_t numnodes, uint8_t numlinks, int family, uint8_t transport) { uint8_t i, x, j; struct sockaddr_storage src, dst; int offset = 0; int res; for (i = 1; i <= numnodes; i++) { for (j = 1; j <= numnodes; j++) { /* * don´t connect to itself */ if (j == i) { continue; } printf("host %u adding host: %u\n", i, j); if (knet_host_add(knet_h[i], j) < 0) { printf("Unable to add host: %s\n", strerror(errno)); knet_handle_stop_nodes(knet_h, numnodes); exit(FAIL); } for (x = 0; x < numlinks; x++) { res = -1; offset = 0; while (i + x + offset++ < 65535 && res != 0) { if (_make_local_sockaddr(&src, i + x + offset, family) < 0) { printf("Unable to convert src to sockaddr: %s\n", strerror(errno)); knet_handle_stop_nodes(knet_h, numnodes); exit(FAIL); } if (_make_local_sockaddr(&dst, j + x + offset, family) < 0) { printf("Unable to convert dst to sockaddr: %s\n", strerror(errno)); knet_handle_stop_nodes(knet_h, numnodes); exit(FAIL); } res = knet_link_set_config(knet_h[i], j, x, transport, &src, &dst, 0); } printf("joining node %u with node %u via link %u src offset: %u dst offset: %u\n", i, j, x, i+x, j+x); if (knet_link_set_enable(knet_h[i], j, x, 1) < 0) { printf("unable to enable link: %s\n", strerror(errno)); knet_handle_stop_nodes(knet_h, numnodes); exit(FAIL); } } } } for (i = 1; i <= numnodes; i++) { wait_for_nodes_state(knet_h[i], numnodes, 1, 600, knet_h[1]->logfd, stdout); } return; } static int target=0; static pthread_mutex_t wait_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t wait_cond = PTHREAD_COND_INITIALIZER; static int count_nodes(knet_handle_t knet_h) { int nodes = 0; int i; for (i=0; i< KNET_MAX_HOST; i++) { if (knet_h->host_index[i] && knet_h->host_index[i]->status.reachable == 1) { nodes++; } } return nodes; } static void nodes_notify_callback(void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external) { knet_handle_t knet_h = (knet_handle_t) private_data; int nodes; nodes = count_nodes(knet_h); if (nodes == target) { pthread_cond_signal(&wait_cond); } } static void host_notify_callback(void *private_data, knet_node_id_t host_id, uint8_t reachable, uint8_t remote, uint8_t external) { knet_handle_t knet_h = (knet_handle_t) private_data; if (knet_h->host_index[host_id]->status.reachable == 1) { pthread_cond_signal(&wait_cond); } } /* Wait for a cluster of 'numnodes' to come up/go down */ int wait_for_nodes_state(knet_handle_t knet_h, size_t numnodes, uint8_t state, uint32_t timeout, int logfd, FILE *std) { struct timespec ts; - int res; + int res, savederrno = 0; if (state) { target = numnodes-1; /* exclude us */ } else { target = 0; /* Wait for all to go down */ } /* Set this before checking existing status or there's a race condition */ knet_host_enable_status_change_notify(knet_h, (void *)(long)knet_h, nodes_notify_callback); /* Check we haven't already got all the nodes in the correct state */ if (count_nodes(knet_h) == target) { fprintf(stderr, "target already reached\n"); knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL); flush_logs(logfd, std); return 0; } clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += timeout; if (pthread_mutex_lock(&wait_mutex)) { fprintf(stderr, "unable to get nodewait mutex: %s\n", strerror(errno)); return -1; } + res = pthread_cond_timedwait(&wait_cond, &wait_mutex, &ts); - if (res == -1 && errno == ETIMEDOUT) { + if (res != 0 && res != ETIMEDOUT) { + fprintf(stderr, "pthread_cond_timedwait fatal error\n"); + errno = res; + return -1; + } + if (res == ETIMEDOUT) { fprintf(stderr, "Timed-out\n"); + savederrno = ETIMEDOUT; + res = -1; } pthread_mutex_unlock(&wait_mutex); knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL); flush_logs(logfd, std); + errno = savederrno; return res; } /* Wait for a single node to come up */ int wait_for_host(knet_handle_t knet_h, uint16_t host_id, int seconds, int logfd, FILE *std) { int res; struct timespec ts; if (is_memcheck() || is_helgrind()) { printf("Test suite is running under valgrind, adjusting wait_for_host timeout\n"); seconds = seconds * 16; } /* Set this before checking existing status or there's a race condition */ knet_host_enable_status_change_notify(knet_h, (void *)(long)knet_h, host_notify_callback); /* Check it's not already reachable */ if (knet_h->host_index[host_id]->status.reachable == 1) { knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL); flush_logs(logfd, std); return 0; } clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += seconds; if (pthread_mutex_lock(&wait_mutex)) { fprintf(stderr, "unable to get nodewait mutex: %s\n", strerror(errno)); return -1; } res = pthread_cond_timedwait(&wait_cond, &wait_mutex, &ts); if (res == -1 && errno == ETIMEDOUT) { fprintf(stderr, "Timed-out\n"); knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL); pthread_mutex_unlock(&wait_mutex); flush_logs(logfd, std); return -1; } pthread_mutex_unlock(&wait_mutex); knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL); /* Still wait for it to settle */ flush_logs(logfd, std); test_sleep(knet_h, 1); return 0; } diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index 7a097475..fcc384d2 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -1,956 +1,967 @@ /* * Copyright (C) 2012-2021 Red Hat, Inc. All rights reserved. * * Authors: Fabio M. Di Nitto * Federico Simoncelli * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include "compat.h" #include "compress.h" #include "crypto.h" #include "host.h" #include "links.h" #include "links_acl.h" #include "logging.h" #include "transports.h" #include "transport_common.h" #include "threads_common.h" #include "threads_heartbeat.h" #include "threads_pmtud.h" #include "threads_rx.h" #include "netutils.h" #include "onwire_v1.h" /* * RECV */ /* * return 1 if a > b * return -1 if b > a * return 0 if they are equal */ static inline int _timecmp(struct timespec a, struct timespec b) { if (a.tv_sec != b.tv_sec) { if (a.tv_sec > b.tv_sec) { return 1; } else { return -1; } } else { if (a.tv_nsec > b.tv_nsec) { return 1; } else if (a.tv_nsec < b.tv_nsec) { return -1; } else { return 0; } } } /* * this functions needs to return an index (0 to 7) * to a knet_host_defrag_buf. (-1 on errors) */ static int _find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_host *src_host, seq_num_t seq_num) { int i, oldest; /* * check if there is a buffer already in use handling the same seq_num */ for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) { if (src_host->defrag_buf[i].in_use) { if (src_host->defrag_buf[i].pckt_seq == seq_num) { return i; } } } /* * If there is no buffer that's handling the current seq_num * either it's new or it's been reclaimed already. * check if it's been reclaimed/seen before using the defrag circular * buffer. If the pckt has been seen before, the buffer expired (ETIME) * and there is no point to try to defrag it again. */ if (!_seq_num_lookup(knet_h, src_host, seq_num, 1, 0)) { errno = ETIME; return -1; } /* * register the pckt as seen */ _seq_num_set(src_host, seq_num, 1); /* * see if there is a free buffer */ for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) { if (!src_host->defrag_buf[i].in_use) { return i; } } /* * at this point, there are no free buffers, the pckt is new * and we need to reclaim a buffer, and we will take the one * with the oldest timestamp. It's as good as any. */ oldest = 0; for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) { if (_timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) { oldest = i; } } src_host->defrag_buf[oldest].in_use = 0; return oldest; } static int _pckt_defrag(knet_handle_t knet_h, struct knet_host *src_host, seq_num_t seq_num, unsigned char *data, ssize_t *len, uint8_t frags, uint8_t frag_seq) { struct knet_host_defrag_buf *defrag_buf; int defrag_buf_idx; defrag_buf_idx = _find_pckt_defrag_buf(knet_h, src_host, seq_num); if (defrag_buf_idx < 0) { return 1; } defrag_buf = &src_host->defrag_buf[defrag_buf_idx]; /* * if the buf is not is use, then make sure it's clean */ if (!defrag_buf->in_use) { memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); defrag_buf->in_use = 1; defrag_buf->pckt_seq = seq_num; } /* * update timestamp on the buffer */ clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update); /* * check if we already received this fragment */ if (defrag_buf->frag_map[frag_seq]) { /* * if we have received this fragment and we didn't clear the buffer * it means that we don't have all fragments yet */ return 1; } /* * we need to handle the last packet with gloves due to its different size */ if (frag_seq == frags) { defrag_buf->last_frag_size = *len; /* * in the event when the last packet arrives first, * we still don't know the offset vs the other fragments (based on MTU), * so we store the fragment at the end of the buffer where it's safe * and take a copy of the len so that we can restore its offset later. * remember we can't use the local MTU for this calculation because pMTU * can be asymettric between the same hosts. */ if (!defrag_buf->frag_size) { defrag_buf->last_first = 1; memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), data, *len); } } else { defrag_buf->frag_size = *len; } if (defrag_buf->frag_size) { memmove(defrag_buf->buf + ((frag_seq - 1) * defrag_buf->frag_size), data, *len); } defrag_buf->frag_recv++; defrag_buf->frag_map[frag_seq] = 1; /* * check if we received all the fragments */ if (defrag_buf->frag_recv == frags) { /* * special case the last pckt */ if (defrag_buf->last_first) { memmove(defrag_buf->buf + ((frags - 1) * defrag_buf->frag_size), defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), defrag_buf->last_frag_size); } /* * recalculate packet lenght */ *len = ((frags - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; /* * copy the pckt back in the user data */ memmove(data, defrag_buf->buf, *len); /* * free this buffer */ defrag_buf->in_use = 0; return 0; } return 1; } static int _handle_data_stats(knet_handle_t knet_h, struct knet_link *src_link, ssize_t len, uint64_t decrypt_time) { int stats_err; /* data stats at the top for consistency with TX */ src_link->status.stats.rx_data_packets++; src_link->status.stats.rx_data_bytes += len; if (decrypt_time) { stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); return -1; } /* Only update the crypto overhead for data packets. Mainly to be consistent with TX */ if (decrypt_time < knet_h->stats.rx_crypt_time_min) { knet_h->stats.rx_crypt_time_min = decrypt_time; } if (decrypt_time > knet_h->stats.rx_crypt_time_max) { knet_h->stats.rx_crypt_time_max = decrypt_time; } knet_h->stats.rx_crypt_time_ave = (knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets + decrypt_time) / (knet_h->stats.rx_crypt_packets+1); knet_h->stats.rx_crypt_packets++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); } return 0; } static int _decompress_data(knet_handle_t knet_h, uint8_t decompress_type, unsigned char *data, ssize_t *len, ssize_t header_size) { int err = 0, stats_err = 0; if (decompress_type) { ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t decompress_time; clock_gettime(CLOCK_MONOTONIC, &start_time); err = decompress(knet_h, decompress_type, data, *len - header_size, knet_h->recv_from_links_buf_decompress, &decmp_outlen); clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, &decompress_time); stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex); if (stats_err < 0) { log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err)); return -1; } if (!err) { /* Collect stats */ if (decompress_time < knet_h->stats.rx_compress_time_min) { knet_h->stats.rx_compress_time_min = decompress_time; } if (decompress_time > knet_h->stats.rx_compress_time_max) { knet_h->stats.rx_compress_time_max = decompress_time; } knet_h->stats.rx_compress_time_ave = (knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets + decompress_time) / (knet_h->stats.rx_compressed_packets+1); knet_h->stats.rx_compressed_packets++; knet_h->stats.rx_compressed_original_bytes += decmp_outlen; knet_h->stats.rx_compressed_size_bytes += *len - KNET_HEADER_SIZE; memmove(data, knet_h->recv_from_links_buf_decompress, decmp_outlen); *len = decmp_outlen + header_size; } else { knet_h->stats.rx_failed_to_decompress++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); log_warn(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s", err, strerror(errno)); return -1; } pthread_mutex_unlock(&knet_h->handle_stats_mutex); } return 0; } static int _check_destination(knet_handle_t knet_h, struct knet_header *inbuf, unsigned char *data, ssize_t len, ssize_t header_size, int8_t *channel) { knet_node_id_t dst_host_ids[KNET_MAX_HOST]; size_t dst_host_ids_entries = 0; int bcast = 1; size_t host_idx; int found = 0; if (knet_h->dst_host_filter_fn) { bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, data, len - header_size, KNET_NOTIFY_RX, knet_h->host_id, inbuf->kh_node, channel, dst_host_ids, &dst_host_ids_entries); if (bcast < 0) { log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast); return -1; } if ((!bcast) && (!dst_host_ids_entries)) { log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries"); return -1; } /* check if we are dst for this packet */ if (!bcast) { if (dst_host_ids_entries > KNET_MAX_HOST) { log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations"); return -1; } for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) { if (dst_host_ids[host_idx] == knet_h->host_id) { found = 1; break; } } if (!found) { log_debug(knet_h, KNET_SUB_RX, "Packet is not for us"); return -1; } } } return 0; } static int _deliver_data(knet_handle_t knet_h, unsigned char *data, ssize_t len, ssize_t header_size, int8_t channel) { struct iovec iov_out[1]; ssize_t outlen = 0; memset(iov_out, 0, sizeof(iov_out)); retry: iov_out[0].iov_base = (void *) data + outlen; iov_out[0].iov_len = len - (outlen + header_size); outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) { log_debug(knet_h, KNET_SUB_RX, "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n", iov_out[0].iov_len, outlen); goto retry; } if (outlen <= 0) { knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data, knet_h->sockfd[channel].sockfd[0], channel, KNET_NOTIFY_RX, outlen, errno); return -1; } if ((size_t)outlen != iov_out[0].iov_len) { return -1; } return 0; } static void _process_data(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf, ssize_t len, uint64_t decrypt_time) { int8_t channel; uint8_t decompress_type = 0; ssize_t header_size; seq_num_t seq_num; uint8_t frags, frag_seq; unsigned char *data; if (_handle_data_stats(knet_h, src_link, len, decrypt_time) < 0) { return; } /* * register host is sending data. Required to determine if we need * to reset circular buffers. (see onwire_v1.c) */ src_host->got_data = 1; if (knet_h->onwire_ver_remap) { get_data_header_info_v1(knet_h, inbuf, &header_size, &channel, &seq_num, &decompress_type, &frags, &frag_seq); data = get_data_v1(knet_h, inbuf); } else { switch (inbuf->kh_version) { case 1: get_data_header_info_v1(knet_h, inbuf, &header_size, &channel, &seq_num, &decompress_type, &frags, &frag_seq); data = get_data_v1(knet_h, inbuf); break; default: log_warn(knet_h, KNET_SUB_RX, "processing data onwire version %u not supported", inbuf->kh_version); return; break; } } if (!_seq_num_lookup(knet_h, src_host, seq_num, 0, 0)) { if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); } return; } if (frags > 1) { /* * len as received from the socket also includes extra stuff * that the defrag code doesn't care about. So strip it * here and readd only for repadding once we are done * defragging * * the defrag code assumes that data packets have all the same size * except the last one that might be smaller. * */ len = len - header_size; if (_pckt_defrag(knet_h, src_host, seq_num, data, &len, frags, frag_seq)) { return; } len = len + header_size; } if (_decompress_data(knet_h, decompress_type, data, &len, header_size) < 0) { return; } if (!src_host->status.reachable) { log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id); return; } if (knet_h->enabled != 1) /* data forward is disabled */ return; if (_check_destination(knet_h, inbuf, data, len, header_size, &channel) < 0) { return; } if (!knet_h->sockfd[channel].in_use) { log_debug(knet_h, KNET_SUB_RX, "received packet for channel %d but there is no local sock connected", channel); return; } if (_deliver_data(knet_h, data, len, header_size, channel) < 0) { return; } _seq_num_set(src_host, seq_num, 0); } static struct knet_header *_decrypt_packet(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len, uint64_t *decrypt_time) { int try_decrypt = 0; int i = 0; struct timespec start_time; struct timespec end_time; ssize_t outlen; for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) { if (knet_h->crypto_instance[i]) { try_decrypt = 1; break; } } if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) { log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!"); return NULL; } if (try_decrypt) { clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_authenticate_and_decrypt(knet_h, (unsigned char *)inbuf, *len, knet_h->recv_from_links_buf_decrypt, &outlen) < 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet"); if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) { return NULL; } log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data"); } else { clock_gettime(CLOCK_MONOTONIC, &end_time); timespec_diff(start_time, end_time, decrypt_time); *len = outlen; inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt; } } return inbuf; } static int _packet_checks(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t len) { if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) { log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len); return -1; } /* * old versions of knet did not advertise max_ver and max_ver is set to 0. */ if (!inbuf->kh_max_ver) { inbuf->kh_max_ver = 1; } /* * if the node joining max version is lower than the min version * then we reject the node */ if (inbuf->kh_max_ver < knet_h->onwire_min_ver) { log_warn(knet_h, KNET_SUB_RX, "Received packet version %u from node %u, lower than currently minimal supported onwire version. Rejecting.", inbuf->kh_version, inbuf->kh_node); return -1; } /* * if the node joining with version higher than our max version * then we reject the node */ if (inbuf->kh_version > knet_h->onwire_max_ver) { log_warn(knet_h, KNET_SUB_RX, "Received packet version %u from node %u, higher than currently maximum supported onwire version. Rejecting.", inbuf->kh_version, inbuf->kh_node); return -1; } /* * if the node joining with version lower than the current in use version * then we reject the node * * NOTE: should we make this configurable and support downgrades? */ if ((!knet_h->onwire_force_ver) && (inbuf->kh_version < knet_h->onwire_ver) && (inbuf->kh_max_ver > inbuf->kh_version)) { log_warn(knet_h, KNET_SUB_RX, "Received packet version %u from node %u, lower than currently in use onwire version. Rejecting.", inbuf->kh_version, inbuf->kh_node); return -1; } return 0; } static void _handle_dynip(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, int sockfd, const struct knet_mmsghdr *msg) { if (src_link->dynamic == KNET_LINK_DYNIP) { if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) { log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address", src_host->host_id, src_link->link_id); memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage)); if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr), src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN, src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???"); snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!"); snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??"); } else { log_info(knet_h, KNET_SUB_RX, "host: %u link: %u new connection established from: %s %s", src_host->host_id, src_link->link_id, src_link->status.dst_ipaddr, src_link->status.dst_port); } } /* * transport has already accepted the connection here * otherwise we would not be receiving packets */ transport_link_dyn_connect(knet_h, sockfd, src_link); } } +/* + * processing incoming packets vs access lists + */ +static int _check_rx_acl(knet_handle_t knet_h, struct knet_link *src_link, const struct knet_mmsghdr *msg) +{ + if (knet_h->use_access_lists) { + if (!check_validate(knet_h, src_link, msg->msg_hdr.msg_name)) { + char src_ipaddr[KNET_MAX_HOST_LEN]; + char src_port[KNET_MAX_PORT_LEN]; + + memset(src_ipaddr, 0, KNET_MAX_HOST_LEN); + memset(src_port, 0, KNET_MAX_PORT_LEN); + if (knet_addrtostr(msg->msg_hdr.msg_name, sockaddr_len(msg->msg_hdr.msg_name), + src_ipaddr, KNET_MAX_HOST_LEN, + src_port, KNET_MAX_PORT_LEN) < 0) { + + log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port"); + } else { + log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port); + } + return 0; + } + } + return 1; +} + static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg) { int savederrno = 0, stats_err = 0; struct knet_host *src_host; struct knet_link *src_link; uint64_t decrypt_time = 0; struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base; ssize_t len = msg->msg_len; int i, found_link = 0; inbuf = _decrypt_packet(knet_h, inbuf, &len, &decrypt_time); if (!inbuf) { return; } inbuf->kh_node = ntohs(inbuf->kh_node); if (_packet_checks(knet_h, inbuf, len) < 0) { return; } /* * determine source host */ src_host = knet_h->host_index[inbuf->kh_node]; if (src_host == NULL) { /* host not found */ log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet"); return; } /* * deteremine source link */ if (inbuf->kh_type == KNET_HEADER_TYPE_PING) { _handle_onwire_version(knet_h, src_host, inbuf); if (knet_h->onwire_ver_remap) { src_link = get_link_from_pong_v1(knet_h, src_host, inbuf); } else { switch (inbuf->kh_version) { case 1: src_link = get_link_from_pong_v1(knet_h, src_host, inbuf); break; default: log_warn(knet_h, KNET_SUB_RX, "Parsing ping onwire version %u not supported", inbuf->kh_version); return; break; } } + if (!_check_rx_acl(knet_h, src_link, msg)) { + return; + } _handle_dynip(knet_h, src_host, src_link, sockfd, msg); } else { /* all other packets */ for (i = 0; i < KNET_MAX_LINK; i++) { src_link = &src_host->link[i]; if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) { found_link = 1; break; } } - if (!found_link) { + if (found_link) { + /* + * this check is currently redundant.. Keep it here for now + */ + if (!_check_rx_acl(knet_h, src_link, msg)) { + return; + } + } else { log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet."); return; } } stats_err = pthread_mutex_lock(&src_link->link_stats_mutex); if (stats_err) { log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s", src_host->host_id, src_link->link_id, strerror(savederrno)); return; } switch (inbuf->kh_type) { case KNET_HEADER_TYPE_DATA: _process_data(knet_h, src_host, src_link, inbuf, len, decrypt_time); break; case KNET_HEADER_TYPE_PING: process_ping(knet_h, src_host, src_link, inbuf, len); break; case KNET_HEADER_TYPE_PONG: process_pong(knet_h, src_host, src_link, inbuf, len); break; case KNET_HEADER_TYPE_PMTUD: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; /* Unlock so we don't deadlock with tx_mutex */ pthread_mutex_unlock(&src_link->link_stats_mutex); process_pmtud(knet_h, src_link, inbuf); return; /* Don't need to unlock link_stats_mutex */ break; case KNET_HEADER_TYPE_PMTUD_REPLY: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; /* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */ pthread_mutex_unlock(&src_link->link_stats_mutex); process_pmtud_reply(knet_h, src_link, inbuf); return; break; default: pthread_mutex_unlock(&src_link->link_stats_mutex); return; break; } pthread_mutex_unlock(&src_link->link_stats_mutex); } static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg) { int err, savederrno; int i, msg_recv, transport; if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) { log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { /* * this is normal if a fd got an event and before we grab the read lock * and the link is removed by another thread */ goto exit_unlock; } transport = knet_h->knet_transport_fd_tracker[sockfd].transport; /* * reset msg_namelen to buffer size because after recvmmsg * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6 */ for (i = 0; i < PCKT_RX_BUFS; i++) { msg[i].msg_hdr.msg_namelen = knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len; } msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL); savederrno = errno; /* * WARNING: man page for recvmmsg is wrong. Kernel implementation here: * recvmmsg can return: * -1 on error * 0 if the previous run of recvmmsg recorded an error on the socket * N number of messages (see exception below). * * If there is an error from recvmsg after receiving a frame or more, the recvmmsg * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and * it will be visibile in the next run. * * Need to be careful how we handle errors at this stage. * * error messages need to be handled on a per transport/protocol base * at this point we have different layers of error handling * - msg_recv < 0 -> error from this run * msg_recv = 0 -> error from previous run and error on socket needs to be cleared * - per-transport message data * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF, * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no * errno set. That means the error api needs to be able to abort the loop below. */ if (msg_recv <= 0) { transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno); goto exit_unlock; } for (i = 0; i < msg_recv; i++) { err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]); /* * TODO: make this section silent once we are confident * all protocols packet handlers are good */ switch(err) { case KNET_TRANSPORT_RX_ERROR: /* on error */ log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet"); goto exit_unlock; break; case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue"); break; case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */ log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop"); goto exit_unlock; break; case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */ - /* - * processing incoming packets vs access lists - */ - if ((knet_h->use_access_lists) && - (transport_get_acl_type(knet_h, transport) == USE_GENERIC_ACL)) { - if (!check_validate(knet_h, sockfd, transport, msg[i].msg_hdr.msg_name)) { - char src_ipaddr[KNET_MAX_HOST_LEN]; - char src_port[KNET_MAX_PORT_LEN]; - - memset(src_ipaddr, 0, KNET_MAX_HOST_LEN); - memset(src_port, 0, KNET_MAX_PORT_LEN); - if (knet_addrtostr(msg[i].msg_hdr.msg_name, sockaddr_len(msg[i].msg_hdr.msg_name), - src_ipaddr, KNET_MAX_HOST_LEN, - src_port, KNET_MAX_PORT_LEN) < 0) { - - log_debug(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port"); - } else { - log_debug(knet_h, KNET_SUB_RX, "Packet rejected from %s/%s", src_ipaddr, src_port); - } - /* - * continue processing the other packets - */ - continue; - } - } _parse_recv_from_links(knet_h, sockfd, &msg[i]); break; case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE: log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue"); break; case KNET_TRANSPORT_RX_OOB_DATA_STOP: log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop"); goto exit_unlock; break; } } exit_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } void *_handle_recv_from_links_thread(void *data) { int i, nev; knet_handle_t knet_h = (knet_handle_t) data; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; struct sockaddr_storage address[PCKT_RX_BUFS]; struct knet_mmsghdr msg[PCKT_RX_BUFS]; struct iovec iov_in[PCKT_RX_BUFS]; set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED); memset(&msg, 0, sizeof(msg)); memset(&events, 0, sizeof(events)); for (i = 0; i < PCKT_RX_BUFS; i++) { iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i]; iov_in[i].iov_len = KNET_DATABUFSIZE; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_name = &address[i]; msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); /* Real value filled in before actual use */ msg[i].msg_hdr.msg_iov = &iov_in[i]; msg[i].msg_hdr.msg_iovlen = 1; } while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000); /* * the RX threads only need to notify that there has been at least * one successful run after queue flush has been requested. * See setfwd in handle.c */ if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) { set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED); } /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { continue; } for (i = 0; i < nev; i++) { _handle_recv_from_links(knet_h, events[i].data.fd, msg); } } set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED); return NULL; } ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0; ssize_t err = 0; struct iovec iov_in; if (!_is_valid_handle(knet_h)) { return -1; } if (buff == NULL) { errno = EINVAL; return -1; } if (buff_len <= 0) { errno = EINVAL; return -1; } if (buff_len > KNET_MAX_PACKET_SIZE) { errno = EINVAL; return -1; } if (channel < 0) { errno = EINVAL; return -1; } if (channel >= KNET_DATAFD_MAX) { errno = EINVAL; return -1; } savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock); if (savederrno) { log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s", strerror(savederrno)); errno = savederrno; return -1; } if (!knet_h->sockfd[channel].in_use) { savederrno = EINVAL; err = -1; goto out_unlock; } memset(&iov_in, 0, sizeof(iov_in)); iov_in.iov_base = (void *)buff; iov_in.iov_len = buff_len; err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1); savederrno = errno; out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); errno = err ? savederrno : 0; return err; } diff --git a/libknet/transport_sctp.c b/libknet/transport_sctp.c index c69cbebb..91d0d93c 100644 --- a/libknet/transport_sctp.c +++ b/libknet/transport_sctp.c @@ -1,1622 +1,1646 @@ /* * Copyright (C) 2016-2021 Red Hat, Inc. All rights reserved. * * Author: Christine Caulfield * * This software licensed under LGPL-2.0+ */ #include "config.h" #include #include #include #include #include #include #include #include #include "compat.h" #include "host.h" #include "links.h" #include "links_acl.h" #include "links_acl_ip.h" #include "logging.h" #include "netutils.h" #include "common.h" #include "transport_common.h" #include "transports.h" #include "threads_common.h" #ifdef HAVE_NETINET_SCTP_H #include #include "transport_sctp.h" typedef struct sctp_handle_info { struct qb_list_head listen_links_list; struct qb_list_head connect_links_list; int connect_epollfd; int connectsockfd[2]; int listen_epollfd; int listensockfd[2]; pthread_t connect_thread; pthread_t listen_thread; socklen_t event_subscribe_kernel_size; char *event_subscribe_buffer; } sctp_handle_info_t; /* * use by fd_tracker data type */ #define SCTP_NO_LINK_INFO 0 #define SCTP_LISTENER_LINK_INFO 1 #define SCTP_ACCEPTED_LINK_INFO 2 #define SCTP_CONNECT_LINK_INFO 3 /* * this value is per listener */ #define MAX_ACCEPTED_SOCKS 256 typedef struct sctp_listen_link_info { struct qb_list_head list; int listen_sock; int accepted_socks[MAX_ACCEPTED_SOCKS]; struct sockaddr_storage src_address; int on_listener_epoll; int on_rx_epoll; int sock_shutdown; } sctp_listen_link_info_t; typedef struct sctp_accepted_link_info { char mread_buf[KNET_DATABUFSIZE]; ssize_t mread_len; sctp_listen_link_info_t *link_info; } sctp_accepted_link_info_t ; typedef struct sctp_connect_link_info { struct qb_list_head list; sctp_listen_link_info_t *listener; struct knet_link *link; struct sockaddr_storage dst_address; int connect_sock; int on_rx_epoll; int close_sock; int sock_shutdown; } sctp_connect_link_info_t; /* * socket handling functions * * those functions do NOT perform locking. locking * should be handled in the right context from callers */ /* * sockets are removed from rx_epoll from callers * see also error handling functions */ static int _close_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; struct epoll_event ev; sctp_connect_link_info_t *info = kn_link->transport_link; if (info->connect_sock != -1) { if (info->on_rx_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->connect_sock; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->connect_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove connected socket from epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_rx_epoll = 0; } if (_set_fd_tracker(knet_h, info->connect_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, 0, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); } else { close(info->connect_sock); info->connect_sock = -1; } } exit_error: errno = savederrno; return err; } static int _enable_sctp_notifications(knet_handle_t knet_h, int sock, const char *type) { int err = 0, savederrno = 0; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; if (setsockopt(sock, IPPROTO_SCTP, SCTP_EVENTS, handle_info->event_subscribe_buffer, handle_info->event_subscribe_kernel_size) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to enable %s events: %s", type, strerror(savederrno)); } errno = savederrno; return err; } static int _configure_sctp_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, uint64_t flags, const char *type) { int err = 0, savederrno = 0; int value; int level; #ifdef SOL_SCTP level = SOL_SCTP; #else level = IPPROTO_SCTP; #endif if (_configure_transport_socket(knet_h, sock, address, flags, type) < 0) { savederrno = errno; err = -1; goto exit_error; } value = 1; if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set reuseaddr on socket %d: %s", sock, strerror(savederrno)); goto exit_error; } value = 1; if (setsockopt(sock, level, SCTP_NODELAY, &value, sizeof(value)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSPORT, "Unable to set sctp nodelay: %s", strerror(savederrno)); goto exit_error; } if (_enable_sctp_notifications(knet_h, sock, type) < 0) { savederrno = errno; err = -1; } exit_error: errno = savederrno; return err; } static int _reconnect_socket(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; sctp_connect_link_info_t *info = kn_link->transport_link; if (connect(info->connect_sock, (struct sockaddr *)&kn_link->dst_addr, sockaddr_len(&kn_link->dst_addr)) < 0) { savederrno = errno; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP socket %d received error: %s", info->connect_sock, strerror(savederrno)); if ((savederrno != EALREADY) && (savederrno != EINPROGRESS) && (savederrno != EISCONN)) { err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to connect SCTP socket %d: %s", info->connect_sock, strerror(savederrno)); } } errno = savederrno; return err; } static int _create_connect_socket(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; struct epoll_event ev; sctp_connect_link_info_t *info = kn_link->transport_link; int connect_sock; struct sockaddr_storage connect_addr; connect_sock = socket(kn_link->dst_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP); if (connect_sock < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create send/recv socket: %s", strerror(savederrno)); goto exit_error; } if (_configure_sctp_socket(knet_h, connect_sock, &kn_link->dst_addr, kn_link->flags, "SCTP connect") < 0) { savederrno = errno; err = -1; goto exit_error; } memset(&connect_addr, 0, sizeof(struct sockaddr_storage)); if (knet_strtoaddr(kn_link->status.src_ipaddr, "0", &connect_addr, sockaddr_len(&connect_addr)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to resolve connecting socket: %s", strerror(savederrno)); goto exit_error; } if (bind(connect_sock, (struct sockaddr *)&connect_addr, sockaddr_len(&connect_addr)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind connecting socket: %s", strerror(savederrno)); goto exit_error; } if (_set_fd_tracker(knet_h, connect_sock, KNET_TRANSPORT_SCTP, SCTP_CONNECT_LINK_INFO, sockaddr_len(&kn_link->src_addr), info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = connect_sock; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, connect_sock, &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connected socket to epoll pool: %s", strerror(errno)); } info->on_rx_epoll = 1; info->connect_sock = connect_sock; info->close_sock = 0; kn_link->outsock = info->connect_sock; if (_reconnect_socket(knet_h, kn_link) < 0) { savederrno = errno; err = -1; goto exit_error; } exit_error: if (err) { if (connect_sock >= 0) { close(connect_sock); } } errno = savederrno; return err; } static void _lock_sleep_relock(knet_handle_t knet_h) { int i = 0; /* Don't hold onto the lock while sleeping */ pthread_rwlock_unlock(&knet_h->global_rwlock); while (i < 5) { usleep(knet_h->threads_timer_res / 16); if (!pthread_rwlock_rdlock(&knet_h->global_rwlock)) { /* * lock acquired, we can go out */ return; } else { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get read lock!"); i++; } } /* * time to crash! if we cannot re-acquire the lock * there is no easy way out of this one */ assert(0); } int sctp_transport_tx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno) { sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data; sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data; sctp_listen_link_info_t *listen_info; if (recv_err < 0) { switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) { case SCTP_CONNECT_LINK_INFO: if (connect_info->link->transport_connected == 0) { return -1; } break; case SCTP_ACCEPTED_LINK_INFO: listen_info = accepted_info->link_info; if (listen_info->listen_sock != sockfd) { if (listen_info->on_rx_epoll == 0) { return -1; } } break; } if (recv_errno == EAGAIN) { #ifdef DEBUG log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Sock: %d is overloaded. Slowing TX down", sockfd); #endif _lock_sleep_relock(knet_h); return 1; } return -1; } return 0; } /* * socket error management functions * * both called with global read lock. * * NOTE: we need to remove the fd from the epoll as soon as possible * even before we notify the respective thread to take care of it * because scheduling can make it so that this thread will overload * and the threads supposed to take care of the error will never * be able to take action. * we CANNOT handle FDs here directly (close/reconnect/etc) due * to locking context. We need to delegate that to their respective * management threads within the global write lock. * * this function is called from: * - RX thread with recv_err <= 0 directly on recvmmsg error * - transport_rx_is_data when msg_len == 0 (recv_err = 1) * - transport_rx_is_data on notification (recv_err = 2) * * basically this small abuse of recv_err is to detect notifications * generated by sockets created by listen(). */ int sctp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno) { struct epoll_event ev; sctp_accepted_link_info_t *accepted_info = knet_h->knet_transport_fd_tracker[sockfd].data; sctp_listen_link_info_t *listen_info; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; switch (knet_h->knet_transport_fd_tracker[sockfd].data_type) { case SCTP_CONNECT_LINK_INFO: /* * all connect link have notifications enabled * and we accept only data from notification and * generic recvmmsg errors. * * Errors generated by msg_len 0 can be ignored because * they follow a notification (double notification) */ if (recv_err != 1) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received an error", sockfd); if (sendto(handle_info->connectsockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno)); } } break; case SCTP_ACCEPTED_LINK_INFO: listen_info = accepted_info->link_info; if (listen_info->listen_sock != sockfd) { if (recv_err != 1) { if (listen_info->on_rx_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = sockfd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, sockfd, &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s", strerror(errno)); return -1; } listen_info->on_rx_epoll = 0; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying listen thread that sockfd %d received an error", sockfd); if (sendto(handle_info->listensockfd[1], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify listen thread: %s", strerror(errno)); } } } else { /* * this means the listen() socket has generated * a notification. now what? :-) */ log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen() socket %d", sockfd); } break; default: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received unknown notification? %d", sockfd); break; } /* * Under RX pressure we need to give time to IPC to pick up the message */ _lock_sleep_relock(knet_h); return 0; } /* * NOTE: sctp_transport_rx_is_data is called with global rdlock * delegate any FD error management to sctp_transport_rx_sock_error * and keep this code to parsing incoming data only */ int sctp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg) { size_t i; struct iovec *iov = msg->msg_hdr.msg_iov; size_t iovlen = msg->msg_hdr.msg_iovlen; struct sctp_assoc_change *sac; union sctp_notification *snp; sctp_accepted_link_info_t *listen_info = knet_h->knet_transport_fd_tracker[sockfd].data; sctp_connect_link_info_t *connect_info = knet_h->knet_transport_fd_tracker[sockfd].data; if (!(msg->msg_hdr.msg_flags & MSG_NOTIFICATION)) { if (msg->msg_len == 0) { /* * NOTE: with event notification enabled, we receive error twice: * 1) from the event notification * 2) followed by a 0 byte msg_len * * the event handler should take care to avoid #2 by stopping * the rx thread from processing more packets than necessary. */ if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) { if (connect_info->sock_shutdown) { return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE; } } else { if (listen_info->link_info->sock_shutdown) { return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE; } } /* * this is pretty much dead code and we should never hit it. * keep it for safety and avoid the rx thread to process * bad info / data. */ return KNET_TRANSPORT_RX_NOT_DATA_STOP; } /* * missing MSG_EOR has to be treated as a short read * from the socket and we need to fill in the mread buf * while we wait for MSG_EOR */ if (!(msg->msg_hdr.msg_flags & MSG_EOR)) { /* * copy the incoming data into mread_buf + mread_len (incremental) * and increase mread_len */ memmove(listen_info->mread_buf + listen_info->mread_len, iov->iov_base, msg->msg_len); listen_info->mread_len = listen_info->mread_len + msg->msg_len; return KNET_TRANSPORT_RX_NOT_DATA_CONTINUE; } /* * got EOR. * if mread_len is > 0 we are completing a packet from short reads * complete reassembling the packet in mread_buf, copy it back in the iov * and set the iov/msg len numbers (size) correctly */ if (listen_info->mread_len) { /* * add last fragment to mread_buf */ memmove(listen_info->mread_buf + listen_info->mread_len, iov->iov_base, msg->msg_len); listen_info->mread_len = listen_info->mread_len + msg->msg_len; /* * move all back into the iovec */ memmove(iov->iov_base, listen_info->mread_buf, listen_info->mread_len); msg->msg_len = listen_info->mread_len; listen_info->mread_len = 0; } return KNET_TRANSPORT_RX_IS_DATA; } if (!(msg->msg_hdr.msg_flags & MSG_EOR)) { return KNET_TRANSPORT_RX_NOT_DATA_STOP; } for (i = 0; i < iovlen; i++) { snp = iov[i].iov_base; switch (snp->sn_header.sn_type) { case SCTP_ASSOC_CHANGE: sac = &snp->sn_assoc_change; switch (sac->sac_state) { case SCTP_COMM_LOST: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: comm_lost", sockfd); if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) { connect_info->close_sock = 1; connect_info->link->transport_connected = 0; } sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0); return KNET_TRANSPORT_RX_OOB_DATA_STOP; break; case SCTP_COMM_UP: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: comm_up", sockfd); if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) { connect_info->link->transport_connected = 1; } break; case SCTP_RESTART: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: restart", sockfd); break; case SCTP_SHUTDOWN_COMP: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: shutdown comp", sockfd); if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) { connect_info->close_sock = 1; } sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0); return KNET_TRANSPORT_RX_OOB_DATA_STOP; break; case SCTP_CANT_STR_ASSOC: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: cant str assoc", sockfd); sctp_transport_rx_sock_error(knet_h, sockfd, 2, 0); break; default: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp assoc change socket %d: unknown %d", sockfd, sac->sac_state); break; } break; case SCTP_SHUTDOWN_EVENT: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp shutdown event socket %d", sockfd); if (knet_h->knet_transport_fd_tracker[sockfd].data_type == SCTP_CONNECT_LINK_INFO) { connect_info->link->transport_connected = 0; connect_info->sock_shutdown = 1; } else { listen_info->link_info->sock_shutdown = 1; } break; case SCTP_SEND_FAILED: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp send failed socket: %d", sockfd); break; case SCTP_PEER_ADDR_CHANGE: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp peer addr change socket %d", sockfd); break; case SCTP_REMOTE_ERROR: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] sctp remote error socket %d", sockfd); break; default: log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "[event] unknown sctp event socket: %d type: %hu", sockfd, snp->sn_header.sn_type); break; } } return KNET_TRANSPORT_RX_OOB_DATA_CONTINUE; } int sctp_transport_link_is_down(knet_handle_t knet_h, struct knet_link *kn_link) { sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_connect_link_info_t *info = kn_link->transport_link; kn_link->transport_connected = 0; info->close_sock = 1; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Notifying connect thread that sockfd %d received a link down event", info->connect_sock); if (sendto(handle_info->connectsockfd[1], &info->connect_sock, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to notify connect thread: %s", strerror(errno)); } return 0; } /* * connect / outgoing socket management thread */ /* * _handle_connected_sctp* are called with a global write lock * from the connect_thread */ static void _handle_connected_sctp_socket(knet_handle_t knet_h, int connect_sock) { int err; unsigned int status, len = sizeof(status); sctp_connect_link_info_t *info = knet_h->knet_transport_fd_tracker[connect_sock].data; struct knet_link *kn_link = info->link; if (info->close_sock) { if (_close_connect_socket(knet_h, kn_link) < 0) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close sock %d from _handle_connected_sctp_socket: %s", connect_sock, strerror(errno)); return; } info->close_sock = 0; if (_create_connect_socket(knet_h, kn_link) < 0) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to recreate connecting sock! %s", strerror(errno)); return; } } _reconnect_socket(knet_h, info->link); err = getsockopt(connect_sock, SOL_SOCKET, SO_ERROR, &status, &len); if (err) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP getsockopt() on connecting socket %d failed: %s", connect_sock, strerror(errno)); return; } if (status) { log_info(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect on %d to %s port %s failed: %s", connect_sock, kn_link->status.dst_ipaddr, kn_link->status.dst_port, strerror(status)); /* * No need to create a new socket if connect failed, * just retry connect */ return; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP handler fd %d now connected to %s port %s", connect_sock, kn_link->status.dst_ipaddr, kn_link->status.dst_port); } static void _handle_connected_sctp_notifications(knet_handle_t knet_h) { int sockfd = -1; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; if (recv(handle_info->connectsockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on connectsockfd"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for connected socket fd error"); return; } /* * revalidate sockfd */ if ((sockfd < 0) || (sockfd >= KNET_MAX_FDS)) { return; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing connected error on socket: %d", sockfd); _handle_connected_sctp_socket(knet_h, sockfd); } static void *_sctp_connect_thread(void *data) { int savederrno; int i, nev; knet_handle_t knet_h = (knet_handle_t) data; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_STARTED); memset(&events, 0, sizeof(events)); while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(handle_info->connect_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000); /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { continue; } if (nev < 0) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP connect handler EPOLL ERROR: %s", strerror(errno)); continue; } /* * Sort out which FD has a connection */ savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s", strerror(savederrno)); continue; } /* * minor optimization: deduplicate events * * in some cases we can receive multiple notifcations * of the same FD having issues or need handling. * It's enough to process it once even tho it's safe * to handle them multiple times. */ for (i = 0; i < nev; i++) { if (events[i].data.fd == handle_info->connectsockfd[0]) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for connected socket"); _handle_connected_sctp_notifications(knet_h); } else { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification on connected sockfd %d\n", events[i].data.fd); } } pthread_rwlock_unlock(&knet_h->global_rwlock); /* * this thread can generate events for itself. * we need to sleep in between loops to allow other threads * to be scheduled */ usleep(knet_h->reconnect_int * 1000); } set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_STOPPED); return NULL; } /* * listen/incoming connections management thread */ /* * Listener received a new connection * called with a write lock from main thread */ static void _handle_incoming_sctp(knet_handle_t knet_h, int listen_sock) { int err = 0, savederrno = 0; int new_fd; int i = -1; sctp_listen_link_info_t *info = knet_h->knet_transport_fd_tracker[listen_sock].data; struct epoll_event ev; struct sockaddr_storage ss; socklen_t sock_len = sizeof(ss); char addr_str[KNET_MAX_HOST_LEN]; char port_str[KNET_MAX_PORT_LEN]; sctp_accepted_link_info_t *accept_info = NULL; + struct knet_host *host; + struct knet_link *kn_link; + int link_idx; + sctp_connect_link_info_t *this_link_connect_info; + sctp_listen_link_info_t *this_link_listen_info; + int pass_acl = 0; memset(&ss, 0, sizeof(ss)); new_fd = accept(listen_sock, (struct sockaddr *)&ss, &sock_len); if (new_fd < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accept error: %s", strerror(errno)); goto exit_error; } if (knet_addrtostr(&ss, sizeof(ss), addr_str, KNET_MAX_HOST_LEN, port_str, KNET_MAX_PORT_LEN) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to gather socket info"); goto exit_error; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: received connection from: %s port: %s", addr_str, port_str); + if (knet_h->use_access_lists) { - if (!check_validate(knet_h, listen_sock, KNET_TRANSPORT_SCTP, &ss)) { + for (host = knet_h->host_head; host != NULL; host = host->next) { + for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { + kn_link = &host->link[link_idx]; + + if ((kn_link->configured) && (kn_link->transport == KNET_TRANSPORT_SCTP)) { + this_link_connect_info = kn_link->transport_link; + this_link_listen_info = this_link_connect_info->listener; + if ((this_link_listen_info->listen_sock == listen_sock) && + (check_validate(knet_h, kn_link, &ss))) { + pass_acl = 1; + break; + } + } + } + if (pass_acl) { + break; + } + } + if (!pass_acl) { savederrno = EINVAL; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Connection rejected from %s/%s", addr_str, port_str); close(new_fd); errno = savederrno; return; } } /* * Keep a track of all accepted FDs */ for (i=0; iaccepted_socks[i] == -1) { info->accepted_socks[i] = new_fd; break; } } if (i == MAX_ACCEPTED_SOCKS) { errno = EBUSY; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: too many connections!"); goto exit_error; } if (_configure_common_socket(knet_h, new_fd, 0, "SCTP incoming") < 0) { /* Inherit flags from listener? */ savederrno = errno; err = -1; goto exit_error; } if (_enable_sctp_notifications(knet_h, new_fd, "Incoming connection") < 0) { savederrno = errno; err = -1; goto exit_error; } accept_info = malloc(sizeof(sctp_accepted_link_info_t)); if (!accept_info) { savederrno = errno; err = -1; goto exit_error; } memset(accept_info, 0, sizeof(sctp_accepted_link_info_t)); accept_info->link_info = info; if (_set_fd_tracker(knet_h, new_fd, KNET_TRANSPORT_SCTP, SCTP_ACCEPTED_LINK_INFO, knet_h->knet_transport_fd_tracker[listen_sock].sockaddr_len, accept_info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(errno)); goto exit_error; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = new_fd; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_ADD, new_fd, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: unable to add accepted socket %d to epoll pool: %s", new_fd, strerror(errno)); goto exit_error; } info->on_rx_epoll = 1; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Incoming: accepted new fd %d for %s/%s (listen fd: %d). index: %d", new_fd, addr_str, port_str, info->listen_sock, i); exit_error: if (err) { if ((i >= 0) && (i < MAX_ACCEPTED_SOCKS)) { info->accepted_socks[i] = -1; } /* * check the error to make coverity scan happy. * _set_fd_tracker cannot fail at this stage */ if (_set_fd_tracker(knet_h, new_fd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, 0, NULL) < 0){ log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to update fdtracker for socket %d", new_fd); } free(accept_info); if (new_fd >= 0) { close(new_fd); } } errno = savederrno; return; } /* * Listen thread received a notification of a bad socket that needs closing * called with a write lock from main thread */ static void _handle_listen_sctp_errors(knet_handle_t knet_h) { int sockfd = -1; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_accepted_link_info_t *accept_info; sctp_listen_link_info_t *info; struct knet_host *host; int link_idx; int i; if (recv(handle_info->listensockfd[0], &sockfd, sizeof(int), MSG_DONTWAIT | MSG_NOSIGNAL) != sizeof(int)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Short read on listensockfd"); return; } if (_is_valid_fd(knet_h, sockfd) < 1) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received stray notification for listen socket fd error"); return; } /* * revalidate sockfd */ if ((sockfd < 0) || (sockfd >= KNET_MAX_FDS)) { return; } log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Processing listen error on socket: %d", sockfd); accept_info = knet_h->knet_transport_fd_tracker[sockfd].data; info = accept_info->link_info; /* * clear all links using this accepted socket as * outbound dynamically connected socket */ for (host = knet_h->host_head; host != NULL; host = host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if ((host->link[link_idx].dynamic == KNET_LINK_DYNIP) && (host->link[link_idx].outsock == sockfd)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Found dynamic connection on host %d link %d (%d)", host->host_id, link_idx, sockfd); host->link[link_idx].status.dynconnected = 0; host->link[link_idx].transport_connected = 0; host->link[link_idx].outsock = 0; memset(&host->link[link_idx].dst_addr, 0, sizeof(struct sockaddr_storage)); } } } for (i=0; iaccepted_socks[i]) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Closing accepted socket %d", sockfd); /* * check the error to make coverity scan happy. * _set_fd_tracker cannot fail at this stage */ if (_set_fd_tracker(knet_h, sockfd, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, 0, NULL) < 0) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to update fdtracker for socket %d", sockfd); } info->accepted_socks[i] = -1; free(accept_info); close(sockfd); break; /* Keeps covscan happy */ } } } static void *_sctp_listen_thread(void *data) { int savederrno; int i, nev; knet_handle_t knet_h = (knet_handle_t) data; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; struct epoll_event events[KNET_EPOLL_MAX_EVENTS]; set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_STARTED); memset(&events, 0, sizeof(events)); while (!shutdown_in_progress(knet_h)) { nev = epoll_wait(handle_info->listen_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000); /* * we use timeout to detect if thread is shutting down */ if (nev == 0) { continue; } if (nev < 0) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listen handler EPOLL ERROR: %s", strerror(errno)); continue; } savederrno = get_global_wrlock(knet_h); if (savederrno) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to get write lock: %s", strerror(savederrno)); continue; } /* * Sort out which FD has an incoming connection */ for (i = 0; i < nev; i++) { if (events[i].data.fd == handle_info->listensockfd[0]) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received notification from rx_error for listener/accepted socket"); _handle_listen_sctp_errors(knet_h); } else { if (_is_valid_fd(knet_h, events[i].data.fd) == 1) { _handle_incoming_sctp(knet_h, events[i].data.fd); } else { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Received listen notification from invalid socket"); } } } pthread_rwlock_unlock(&knet_h->global_rwlock); } set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_STOPPED); return NULL; } /* * sctp_link_listener_start/stop are called in global write lock * context from set_config and clear_config. */ static sctp_listen_link_info_t *sctp_link_listener_start(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; int listen_sock = -1; struct epoll_event ev; sctp_listen_link_info_t *info = NULL; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; /* * Only allocate a new listener if src address is different */ qb_list_for_each_entry(info, &handle_info->listen_links_list, list) { if (memcmp(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)) == 0) { - if ((check_add(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP, -1, + if ((check_add(knet_h, kn_link, -1, &kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) { return NULL; } return info; } } info = malloc(sizeof(sctp_listen_link_info_t)); if (!info) { err = -1; goto exit_error; } memset(info, 0, sizeof(sctp_listen_link_info_t)); memset(info->accepted_socks, -1, sizeof(info->accepted_socks)); memmove(&info->src_address, &kn_link->src_addr, sizeof(struct sockaddr_storage)); listen_sock = socket(kn_link->src_addr.ss_family, SOCK_STREAM, IPPROTO_SCTP); if (listen_sock < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create listener socket: %s", strerror(savederrno)); goto exit_error; } if (_configure_sctp_socket(knet_h, listen_sock, &kn_link->src_addr, kn_link->flags, "SCTP listener") < 0) { savederrno = errno; err = -1; goto exit_error; } if (bind(listen_sock, (struct sockaddr *)&kn_link->src_addr, sockaddr_len(&kn_link->src_addr)) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to bind listener socket: %s", strerror(savederrno)); goto exit_error; } if (listen(listen_sock, 5) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to listen on listener socket: %s", strerror(savederrno)); goto exit_error; } if (_set_fd_tracker(knet_h, listen_sock, KNET_TRANSPORT_SCTP, SCTP_LISTENER_LINK_INFO, sockaddr_len(&kn_link->src_addr), info) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } - if ((check_add(knet_h, listen_sock, KNET_TRANSPORT_SCTP, -1, + if ((check_add(knet_h, kn_link, -1, &kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != EEXIST)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to configure default access lists: %s", strerror(savederrno)); goto exit_error; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = listen_sock; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, listen_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listener to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_listener_epoll = 1; info->listen_sock = listen_sock; qb_list_add(&info->list, &handle_info->listen_links_list); log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Listening on fd %d for %s:%s", listen_sock, kn_link->status.src_ipaddr, kn_link->status.src_port); exit_error: if (err) { if ((info) && (info->on_listener_epoll)) { epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, listen_sock, &ev); } if (listen_sock >= 0) { - check_rmall(knet_h, listen_sock, KNET_TRANSPORT_SCTP); + check_rmall(knet_h, kn_link); close(listen_sock); } if (info) { free(info); info = NULL; } } errno = savederrno; return info; } static int sctp_link_listener_stop(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; int found = 0, i; struct knet_host *host; int link_idx; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; sctp_connect_link_info_t *this_link_info = kn_link->transport_link; sctp_listen_link_info_t *info = this_link_info->listener; sctp_connect_link_info_t *link_info; struct epoll_event ev; for (host = knet_h->host_head; host != NULL; host = host->next) { for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) { if (&host->link[link_idx] == kn_link) continue; link_info = host->link[link_idx].transport_link; if ((link_info) && (link_info->listener == info)) { found = 1; break; } } } - if ((check_rm(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP, + if ((check_rm(knet_h, kn_link, &kn_link->dst_addr, &kn_link->dst_addr, CHECK_TYPE_ADDRESS, CHECK_ACCEPT) < 0) && (errno != ENOENT)) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove default access lists for %d", info->listen_sock); } if (found) { this_link_info->listener = NULL; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP listener socket %d still in use", info->listen_sock); savederrno = EBUSY; err = -1; goto exit_error; } if (info->on_listener_epoll) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->listen_sock; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, info->listen_sock, &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener to epoll pool: %s", strerror(savederrno)); goto exit_error; } info->on_listener_epoll = 0; } if (_set_fd_tracker(knet_h, info->listen_sock, KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, 0, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } - check_rmall(knet_h, info->listen_sock, KNET_TRANSPORT_SCTP); - + check_rmall(knet_h, kn_link); close(info->listen_sock); for (i=0; i< MAX_ACCEPTED_SOCKS; i++) { if (info->accepted_socks[i] > -1) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = info->accepted_socks[i]; if (epoll_ctl(knet_h->recv_from_links_epollfd, EPOLL_CTL_DEL, info->accepted_socks[i], &ev)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove EOFed socket from epoll pool: %s", strerror(errno)); } info->on_rx_epoll = 0; free(knet_h->knet_transport_fd_tracker[info->accepted_socks[i]].data); close(info->accepted_socks[i]); if (_set_fd_tracker(knet_h, info->accepted_socks[i], KNET_MAX_TRANSPORTS, SCTP_NO_LINK_INFO, 0, NULL) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set fd tracker: %s", strerror(savederrno)); goto exit_error; } info->accepted_socks[i] = -1; } } qb_list_del(&info->list); free(info); this_link_info->listener = NULL; exit_error: errno = savederrno; return err; } /* * Links config/clear. Both called with global wrlock from link_set_config/clear_config */ int sctp_transport_link_set_config(knet_handle_t knet_h, struct knet_link *kn_link) { int savederrno = 0, err = 0; sctp_connect_link_info_t *info; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; info = malloc(sizeof(sctp_connect_link_info_t)); if (!info) { goto exit_error; } memset(info, 0, sizeof(sctp_connect_link_info_t)); kn_link->transport_link = info; info->link = kn_link; memmove(&info->dst_address, &kn_link->dst_addr, sizeof(struct sockaddr_storage)); info->connect_sock = -1; info->listener = sctp_link_listener_start(knet_h, kn_link); if (!info->listener) { savederrno = errno; err = -1; goto exit_error; } if (kn_link->dynamic == KNET_LINK_STATIC) { if (_create_connect_socket(knet_h, kn_link) < 0) { savederrno = errno; err = -1; goto exit_error; } kn_link->outsock = info->connect_sock; } qb_list_add(&info->list, &handle_info->connect_links_list); exit_error: if (err) { if (info) { if (info->connect_sock >= 0) { close(info->connect_sock); } if (info->listener) { sctp_link_listener_stop(knet_h, kn_link); } kn_link->transport_link = NULL; free(info); } } errno = savederrno; return err; } /* * called with global wrlock */ int sctp_transport_link_clear_config(knet_handle_t knet_h, struct knet_link *kn_link) { int err = 0, savederrno = 0; sctp_connect_link_info_t *info; if (!kn_link) { errno = EINVAL; return -1; } info = kn_link->transport_link; if (!info) { errno = EINVAL; return -1; } if ((sctp_link_listener_stop(knet_h, kn_link) <0) && (errno != EBUSY)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to remove listener transport: %s", strerror(savederrno)); goto exit_error; } if (_close_connect_socket(knet_h, kn_link) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to close connected socket: %s", strerror(savederrno)); goto exit_error; } qb_list_del(&info->list); free(info); kn_link->transport_link = NULL; exit_error: errno = savederrno; return err; } /* * transport_free and transport_init are * called only from knet_handle_new and knet_handle_free. * all resources (hosts/links) should have been already freed at this point * and they are called in a write locked context, hence they * don't need their own locking. */ int sctp_transport_free(knet_handle_t knet_h) { sctp_handle_info_t *handle_info; void *thread_status; struct epoll_event ev; if (!knet_h->transports[KNET_TRANSPORT_SCTP]) { errno = EINVAL; return -1; } handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; /* * keep it here while we debug list usage and such */ if (!qb_list_empty(&handle_info->listen_links_list)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. listen links list is not empty"); } if (!qb_list_empty(&handle_info->connect_links_list)) { log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Internal error. connect links list is not empty"); } if (handle_info->listen_thread) { pthread_cancel(handle_info->listen_thread); pthread_join(handle_info->listen_thread, &thread_status); } if (handle_info->connect_thread) { pthread_cancel(handle_info->connect_thread); pthread_join(handle_info->connect_thread, &thread_status); } if (handle_info->listensockfd[0] >= 0) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->listensockfd[0]; epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_DEL, handle_info->listensockfd[0], &ev); } if (handle_info->connectsockfd[0] >= 0) { memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->connectsockfd[0]; epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_DEL, handle_info->connectsockfd[0], &ev); } _close_socketpair(knet_h, handle_info->connectsockfd); _close_socketpair(knet_h, handle_info->listensockfd); if (handle_info->listen_epollfd >= 0) { close(handle_info->listen_epollfd); } if (handle_info->connect_epollfd >= 0) { close(handle_info->connect_epollfd); } free(handle_info->event_subscribe_buffer); free(handle_info); knet_h->transports[KNET_TRANSPORT_SCTP] = NULL; return 0; } static int _sctp_subscribe_init(knet_handle_t knet_h) { int test_socket, savederrno; sctp_handle_info_t *handle_info = knet_h->transports[KNET_TRANSPORT_SCTP]; char dummy_events[100]; struct sctp_event_subscribe *events; /* Below we set the first 6 fields of this expanding struct. * SCTP_EVENTS is deprecated, but SCTP_EVENT is not available * on Linux; on the other hand, FreeBSD and old Linux does not * accept small transfers, so we can't simply use this minimum * everywhere. Thus we query and store the native size. */ const unsigned int subscribe_min = 6; test_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_SCTP); if (test_socket < 0) { if (errno == EPROTONOSUPPORT) { log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "SCTP not supported, skipping initialization"); return 0; } savederrno = errno; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create test socket: %s", strerror(savederrno)); return savederrno; } handle_info->event_subscribe_kernel_size = sizeof dummy_events; if (getsockopt(test_socket, IPPROTO_SCTP, SCTP_EVENTS, &dummy_events, &handle_info->event_subscribe_kernel_size)) { close(test_socket); savederrno = errno; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to query kernel size of struct sctp_event_subscribe: %s", strerror(savederrno)); return savederrno; } close(test_socket); if (handle_info->event_subscribe_kernel_size < subscribe_min) { savederrno = ERANGE; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "No kernel support for the necessary notifications: struct sctp_event_subscribe is %u bytes, %u needed", handle_info->event_subscribe_kernel_size, subscribe_min); return savederrno; } events = malloc(handle_info->event_subscribe_kernel_size); if (!events) { savederrno = errno; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Failed to allocate event subscribe buffer: %s", strerror(savederrno)); return savederrno; } memset(events, 0, handle_info->event_subscribe_kernel_size); events->sctp_data_io_event = 1; events->sctp_association_event = 1; events->sctp_address_event = 1; events->sctp_send_failure_event = 1; events->sctp_peer_error_event = 1; events->sctp_shutdown_event = 1; handle_info->event_subscribe_buffer = (char *)events; log_debug(knet_h, KNET_SUB_TRANSP_SCTP, "Size of struct sctp_event_subscribe is %u in kernel, %zu in user space", handle_info->event_subscribe_kernel_size, sizeof(struct sctp_event_subscribe)); return 0; } int sctp_transport_init(knet_handle_t knet_h) { int err = 0, savederrno = 0; sctp_handle_info_t *handle_info; struct epoll_event ev; if (knet_h->transports[KNET_TRANSPORT_SCTP]) { errno = EEXIST; return -1; } handle_info = malloc(sizeof(sctp_handle_info_t)); if (!handle_info) { return -1; } memset(handle_info, 0,sizeof(sctp_handle_info_t)); knet_h->transports[KNET_TRANSPORT_SCTP] = handle_info; savederrno = _sctp_subscribe_init(knet_h); if (savederrno) { err = -1; goto exit_fail; } qb_list_init(&handle_info->listen_links_list); qb_list_init(&handle_info->connect_links_list); handle_info->listen_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (handle_info->listen_epollfd < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll listen fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(handle_info->listen_epollfd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on listen_epollfd: %s", strerror(savederrno)); goto exit_fail; } handle_info->connect_epollfd = epoll_create(KNET_EPOLL_MAX_EVENTS + 1); if (handle_info->connect_epollfd < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to create epoll connect fd: %s", strerror(savederrno)); goto exit_fail; } if (_fdset_cloexec(handle_info->connect_epollfd)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to set CLOEXEC on connect_epollfd: %s", strerror(savederrno)); goto exit_fail; } if (_init_socketpair(knet_h, handle_info->connectsockfd) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init connect socketpair: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->connectsockfd[0]; if (epoll_ctl(handle_info->connect_epollfd, EPOLL_CTL_ADD, handle_info->connectsockfd[0], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add connectsockfd[0] to connect epoll pool: %s", strerror(savederrno)); goto exit_fail; } if (_init_socketpair(knet_h, handle_info->listensockfd) < 0) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to init listen socketpair: %s", strerror(savederrno)); goto exit_fail; } memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.fd = handle_info->listensockfd[0]; if (epoll_ctl(handle_info->listen_epollfd, EPOLL_CTL_ADD, handle_info->listensockfd[0], &ev)) { savederrno = errno; err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to add listensockfd[0] to listen epoll pool: %s", strerror(savederrno)); goto exit_fail; } /* * Start connect & listener threads */ set_thread_status(knet_h, KNET_THREAD_SCTP_LISTEN, KNET_THREAD_REGISTERED); savederrno = pthread_create(&handle_info->listen_thread, 0, _sctp_listen_thread, (void *) knet_h); if (savederrno) { err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp listen thread: %s", strerror(savederrno)); goto exit_fail; } set_thread_status(knet_h, KNET_THREAD_SCTP_CONN, KNET_THREAD_REGISTERED); savederrno = pthread_create(&handle_info->connect_thread, 0, _sctp_connect_thread, (void *) knet_h); if (savederrno) { err = -1; log_err(knet_h, KNET_SUB_TRANSP_SCTP, "Unable to start sctp connect thread: %s", strerror(savederrno)); goto exit_fail; } exit_fail: if (err < 0) { sctp_transport_free(knet_h); } errno = savederrno; return err; } int sctp_transport_link_dyn_connect(knet_handle_t knet_h, int sockfd, struct knet_link *kn_link) { kn_link->outsock = sockfd; kn_link->status.dynconnected = 1; kn_link->transport_connected = 1; return 0; } int sctp_transport_link_get_acl_fd(knet_handle_t knet_h, struct knet_link *kn_link) { sctp_connect_link_info_t *this_link_info = kn_link->transport_link; sctp_listen_link_info_t *info = this_link_info->listener; return info->listen_sock; } #endif