diff --git a/Cargo.lock b/Cargo.lock index 085648b8..8075fe8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -417,6 +417,16 @@ dependencies = [ "log", ] +[[package]] +name = "epoll" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e74d68fe2927dbf47aa976d14d93db9b23dced457c7bb2bdc6925a16d31b736e" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -441,6 +451,7 @@ dependencies = [ "aya", "clap", "env_logger", + "epoll", "fact-api", "fact-ebpf", "http-body-util", diff --git a/Cargo.toml b/Cargo.toml index 742c25f4..c726261b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ aya = { version = "0.13.1", default-features = false } anyhow = { version = "1", default-features = false, features = ["std", "backtrace"] } clap = { version = "4.5.41", features = ["derive", "env"] } env_logger = { version = "0.11.5", default-features = false, features = ["humantime"] } +epoll = "4.4.0" http-body-util = "0.1.3" hyper = { version = "1.6.0", default-features = false } hyper-tls = "0.6.0" diff --git a/about.toml b/about.toml index 26d24936..c1e5e27c 100644 --- a/about.toml +++ b/about.toml @@ -5,5 +5,6 @@ accepted = [ "Zlib", "ISC", "Unicode-3.0", + "MPL-2.0", ] diff --git a/fact/Cargo.toml b/fact/Cargo.toml index 3b84db24..22edd2b9 100644 --- a/fact/Cargo.toml +++ b/fact/Cargo.toml @@ -29,6 +29,7 @@ serde = { workspace = true } serde_json = { workspace = true } uuid = { workspace = true } yaml-rust2 = { workspace = true } +epoll = { workspace = true } fact-api = { path = "../fact-api" } fact-ebpf = { path = "../fact-ebpf" } diff --git a/fact/src/bpf/mod.rs b/fact/src/bpf/mod.rs index ec9ca57f..2f7c3cbe 100644 --- a/fact/src/bpf/mod.rs +++ b/fact/src/bpf/mod.rs @@ -1,4 +1,9 @@ -use std::{io, path::PathBuf}; +use std::{ + io, + os::fd::AsRawFd, + path::PathBuf, + thread::{self, JoinHandle}, +}; use anyhow::{bail, Context}; use aya::{ @@ -9,11 +14,7 @@ use aya::{ use checks::Checks; use libc::c_char; use log::{error, info}; -use tokio::{ - io::unix::AsyncFd, - sync::{mpsc, watch}, - task::JoinHandle, -}; +use tokio::sync::{mpsc, watch}; use crate::{event::Event, host_info, metrics::EventCounter}; @@ -172,25 +173,43 @@ impl Bpf { // Gather events from the ring buffer and print them out. pub fn start( mut self, - mut running: watch::Receiver, + running: watch::Receiver, event_counter: EventCounter, ) -> JoinHandle> { info!("Starting BPF worker..."); - tokio::spawn(async move { + thread::spawn(move || { self.attach_progs() .context("Failed to attach ebpf programs")?; - let rb = self.take_ringbuffer()?; - let mut fd = AsyncFd::new(rb)?; + let mut rb = self.take_ringbuffer()?; + + let rb_event = epoll::Event::new(epoll::Events::EPOLLIN, 0); + let poller = match epoll::create(false) { + Ok(p) => p, + Err(e) => bail!("Failed to create epoll: {e:?}"), + }; + if let Err(e) = epoll::ctl( + poller, + epoll::ControlOptions::EPOLL_CTL_ADD, + rb.as_raw_fd(), + rb_event, + ) { + bail!("Failed to add ringbuffer to epoll: {e:?}"); + } loop { - tokio::select! { - guard = fd.readable_mut() => { - let mut guard = guard - .context("ringbuffer guard held while runtime is stopping")?; - let ringbuf = guard.get_inner_mut(); - while let Some(event) = ringbuf.next() { + if running.has_changed()? && !*running.borrow() { + break; + } + + if self.paths_config.has_changed()? { + self.load_paths().context("Failed to load paths")?; + } + + match epoll::wait(poller, 100, &mut [rb_event]) { + Ok(n) if n != 0 => { + while let Some(event) = rb.next() { let event: &event_t = unsafe { &*(event.as_ptr() as *const _) }; let event = match Event::try_from(event) { Ok(event) => event, @@ -202,25 +221,18 @@ impl Bpf { }; event_counter.added(); - if self.tx.send(event).await.is_err() { + if self.tx.blocking_send(event).is_err() { info!("No BPF consumers left, stopping..."); break; } } - guard.clear_ready(); - }, - _ = self.paths_config.changed() => { - self.load_paths().context("Failed to load paths")?; - }, - _ = running.changed() => { - if !*running.borrow() { - info!("Stopping BPF worker..."); - break; - } - }, + } + Ok(_) => {} + Err(e) => bail!("Failed to wait for ringbuffer events: {e:?}"), } } + info!("Stopping BPF worker..."); Ok(()) }) } @@ -242,8 +254,8 @@ mod bpf_tests { use super::*; - #[tokio::test] - async fn test_basic() { + #[test] + fn test_basic() { if let Ok(value) = std::env::var("FACT_LOGLEVEL") { let value = value.to_lowercase(); if value == "debug" || value == "trace" { @@ -266,7 +278,7 @@ mod bpf_tests { let handle = bpf.start(run_rx, exporter.metrics.bpf_worker.clone()); - tokio::time::sleep(Duration::from_millis(500)).await; + thread::sleep(Duration::from_millis(500)); // Create a file let file = NamedTempFile::new_in(monitored_path).expect("Failed to create temporary file"); @@ -316,24 +328,26 @@ mod bpf_tests { // Close the file, removing it file.close().expect("Failed to close temp file"); - let wait = timeout(Duration::from_secs(1), async move { - for expected in expected_events { - println!("expected: {expected:#?}"); - while let Some(event) = rx.recv().await { - println!("{event:#?}"); - if event == expected { - println!("Found!"); - break; + tokio::runtime::Runtime::new().unwrap().block_on(async { + let wait = timeout(Duration::from_secs(1), async { + for expected in expected_events { + println!("expected: {expected:#?}"); + while let Some(event) = rx.recv().await { + println!("{event:#?}"); + if event == expected { + println!("Found!"); + break; + } } } + }); + + tokio::select! { + res = wait => res.unwrap(), } }); - tokio::select! { - res = wait => res.unwrap(), - res = handle => res.unwrap().unwrap(), - } - run_tx.send(false).unwrap(); + handle.join().unwrap().unwrap(); } } diff --git a/fact/src/lib.rs b/fact/src/lib.rs index 880813bb..369ad158 100644 --- a/fact/src/lib.rs +++ b/fact/src/lib.rs @@ -94,9 +94,15 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { )?; let mut host_scanner_handle = host_scanner.start(); endpoints::Server::new(exporter.clone(), reloader.endpoint(), running.subscribe()).start(); - let mut bpf_handle = bpf.start(running.subscribe(), exporter.metrics.bpf_worker.clone()); + let bpf_handle = bpf.start(running.subscribe(), exporter.metrics.bpf_worker.clone()); reloader.start(running.subscribe()); + let (bpf_shutdown_tx, mut bpf_shutdown_rx) = mpsc::channel::>(1); + tokio::task::spawn_blocking(move || { + let res = bpf_handle.join().unwrap(); + bpf_shutdown_tx.blocking_send(res).unwrap(); + }); + let mut sigterm = signal(SignalKind::terminate())?; let mut sighup = signal(SignalKind::hangup())?; loop { @@ -104,12 +110,11 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { _ = tokio::signal::ctrl_c() => break, _ = sigterm.recv() => break, _ = sighup.recv() => config_trigger.notify_one(), - res = bpf_handle.borrow_mut() => { + res = bpf_shutdown_rx.recv() => { match res { - Ok(res) => if let Err(e) = res { - warn!("BPF worker errored out: {e:?}"); - } - Err(e) => warn!("BPF task errored out: {e:?}"), + Some(Ok(())) => info!("BPF worker finished"), + Some(Err(e)) => warn!("BPF worker errored out: {e:?}"), + None => warn!("BPF worker channel closed"), } break; }