-
Notifications
You must be signed in to change notification settings - Fork 0
feat(cli): added cli validator tests #286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
b45292d
ccb3c68
2bc8118
2ee355b
073a39d
edffe34
8c0f4cf
3553cc5
3066fd5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,56 @@ | ||
| //! Validator client connectivity tests. | ||
|
|
||
| use super::{TestCategoryResult, TestConfigArgs}; | ||
| use crate::error::Result; | ||
| use clap::Args; | ||
| use std::{io::Write, time::Duration}; | ||
|
|
||
| use clap::Args; | ||
| use rand::Rng; | ||
| use tokio::{ | ||
| net::TcpStream, | ||
| sync::mpsc, | ||
| time::{Instant, timeout}, | ||
| }; | ||
|
|
||
| use super::{ | ||
| AllCategoriesResult, TestCategory, TestCategoryResult, TestConfigArgs, TestResult, TestVerdict, | ||
| calculate_score, evaluate_highest_rtt, evaluate_rtt, publish_result_to_obol_api, | ||
| write_result_to_file, write_result_to_writer, | ||
| }; | ||
| use crate::{duration::Duration as CliDuration, error::Result}; | ||
|
|
||
| // Thresholds (from Go implementation) | ||
| const THRESHOLD_MEASURE_AVG: Duration = Duration::from_millis(50); | ||
| const THRESHOLD_MEASURE_POOR: Duration = Duration::from_millis(240); | ||
| const THRESHOLD_LOAD_AVG: Duration = Duration::from_millis(50); | ||
| const THRESHOLD_LOAD_POOR: Duration = Duration::from_millis(240); | ||
|
|
||
| /// Validator test cases. | ||
| #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] | ||
| pub enum ValidatorTestCase { | ||
| Ping, | ||
| PingMeasure, | ||
| PingLoad, | ||
| } | ||
|
|
||
| impl ValidatorTestCase { | ||
| /// Returns all validator test cases. | ||
| pub fn all() -> &'static [ValidatorTestCase] { | ||
| &[ | ||
| ValidatorTestCase::Ping, | ||
| ValidatorTestCase::PingMeasure, | ||
| ValidatorTestCase::PingLoad, | ||
| ] | ||
| } | ||
|
|
||
| /// Returns the test name as a string. | ||
| pub fn name(&self) -> &'static str { | ||
| match self { | ||
| ValidatorTestCase::Ping => "Ping", | ||
| ValidatorTestCase::PingMeasure => "PingMeasure", | ||
| ValidatorTestCase::PingLoad => "PingLoad", | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Arguments for the validator test command. | ||
| #[derive(Args, Clone, Debug)] | ||
| pub struct TestValidatorArgs { | ||
|
|
@@ -30,10 +76,233 @@ pub struct TestValidatorArgs { | |
| } | ||
|
|
||
| /// Runs the validator client tests. | ||
| pub async fn run(_args: TestValidatorArgs, _writer: &mut dyn Write) -> Result<TestCategoryResult> { | ||
| // TODO: Implement validator tests | ||
| // - Ping | ||
| // - PingMeasure | ||
| // - PingLoad | ||
| unimplemented!("validator test not yet implemented") | ||
| pub async fn run(args: TestValidatorArgs, writer: &mut dyn Write) -> Result<TestCategoryResult> { | ||
| tracing::info!("Starting validator client test"); | ||
|
|
||
| let start_time = Instant::now(); | ||
|
|
||
| // Get and filter test cases | ||
| let queued_tests: Vec<ValidatorTestCase> = if let Some(ref filter) = args.test_config.test_cases | ||
| { | ||
| ValidatorTestCase::all() | ||
| .iter() | ||
| .filter(|tc| filter.contains(&tc.name().to_string())) | ||
| .copied() | ||
| .collect() | ||
| } else { | ||
| ValidatorTestCase::all().to_vec() | ||
| }; | ||
|
|
||
| if queued_tests.is_empty() { | ||
| return Err(crate::error::CliError::Other( | ||
| "test case not supported".into(), | ||
| )); | ||
| } | ||
|
|
||
| // Run tests with timeout | ||
| let test_results = run_tests_with_timeout(&args, &queued_tests).await; | ||
|
|
||
| let score = calculate_score(&test_results); | ||
|
|
||
| let mut res = TestCategoryResult::new(TestCategory::Validator); | ||
| res.targets.insert(args.api_address.clone(), test_results); | ||
| res.execution_time = Some(CliDuration::new(start_time.elapsed())); | ||
| res.score = Some(score); | ||
|
|
||
| if !args.test_config.quiet { | ||
| write_result_to_writer(&res, writer)?; | ||
| } | ||
|
|
||
| if !args.test_config.output_json.is_empty() { | ||
| write_result_to_file(&res, args.test_config.output_json.as_ref()).await?; | ||
| } | ||
|
|
||
| if args.test_config.publish { | ||
| let all = AllCategoriesResult { | ||
| validator: Some(res.clone()), | ||
| ..Default::default() | ||
| }; | ||
| publish_result_to_obol_api( | ||
| all, | ||
| &args.test_config.publish_addr, | ||
| &args.test_config.publish_private_key_file, | ||
| ) | ||
| .await?; | ||
| } | ||
|
|
||
| Ok(res) | ||
| } | ||
|
|
||
| /// Timeout error message | ||
| const ERR_TIMEOUT_INTERRUPTED: &str = "timeout"; | ||
|
|
||
| /// Runs tests with timeout, keeping completed tests on timeout. | ||
| async fn run_tests_with_timeout( | ||
| args: &TestValidatorArgs, | ||
| tests: &[ValidatorTestCase], | ||
| ) -> Vec<TestResult> { | ||
| let mut results = Vec::new(); | ||
| let timeout_deadline = tokio::time::Instant::now() | ||
| .checked_add(args.test_config.timeout) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use |
||
| .expect("timeout overflow"); | ||
|
|
||
| for &test_case in tests { | ||
| let remaining = timeout_deadline.saturating_duration_since(tokio::time::Instant::now()); | ||
|
|
||
| match tokio::time::timeout(remaining, run_single_test(args, test_case)).await { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The inner spawned tasks in |
||
| Ok(result) => results.push(result), | ||
| Err(_) => { | ||
| results.push( | ||
| TestResult::new(test_case.name()) | ||
| .fail(std::io::Error::other(ERR_TIMEOUT_INTERRUPTED)), | ||
| ); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| results | ||
| } | ||
|
Comment on lines
+140
to
+165
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does not seem to handle the timeout properly. Validate by |
||
|
|
||
| /// Runs a single test case. | ||
| async fn run_single_test(args: &TestValidatorArgs, test_case: ValidatorTestCase) -> TestResult { | ||
| match test_case { | ||
| ValidatorTestCase::Ping => ping_test(args).await, | ||
| ValidatorTestCase::PingMeasure => ping_measure_test(args).await, | ||
| ValidatorTestCase::PingLoad => ping_load_test(args).await, | ||
| } | ||
| } | ||
|
|
||
| async fn ping_test(args: &TestValidatorArgs) -> TestResult { | ||
| let mut result = TestResult::new(ValidatorTestCase::Ping.name()); | ||
|
|
||
| match timeout( | ||
| Duration::from_secs(1), | ||
| TcpStream::connect(&args.api_address), | ||
| ) | ||
| .await | ||
| { | ||
| Ok(Ok(_conn)) => { | ||
| result.verdict = TestVerdict::Ok; | ||
| } | ||
| Ok(Err(e)) => { | ||
| return result.fail(e); | ||
| } | ||
| Err(_) => { | ||
| return result.fail(std::io::Error::new( | ||
| std::io::ErrorKind::TimedOut, | ||
| "connection timeout", | ||
| )); | ||
| } | ||
| } | ||
|
|
||
| result | ||
| } | ||
|
|
||
| async fn ping_measure_test(args: &TestValidatorArgs) -> TestResult { | ||
| let mut result = TestResult::new(ValidatorTestCase::PingMeasure.name()); | ||
| let before = Instant::now(); | ||
|
|
||
| match timeout( | ||
| Duration::from_secs(1), | ||
| TcpStream::connect(&args.api_address), | ||
| ) | ||
| .await | ||
| { | ||
| Ok(Ok(_conn)) => { | ||
| let rtt = before.elapsed(); | ||
| result = evaluate_rtt(rtt, result, THRESHOLD_MEASURE_AVG, THRESHOLD_MEASURE_POOR); | ||
| } | ||
| Ok(Err(e)) => { | ||
| return result.fail(e); | ||
| } | ||
| Err(_) => { | ||
| return result.fail(std::io::Error::new( | ||
| std::io::ErrorKind::TimedOut, | ||
| "connection timeout", | ||
| )); | ||
| } | ||
| } | ||
|
|
||
| result | ||
| } | ||
|
|
||
| async fn ping_load_test(args: &TestValidatorArgs) -> TestResult { | ||
| tracing::info!( | ||
| duration = ?args.load_test_duration, | ||
| target = %args.api_address, | ||
| "Running ping load tests..." | ||
| ); | ||
|
|
||
| let mut result = TestResult::new(ValidatorTestCase::PingLoad.name()); | ||
|
|
||
| let (tx, mut rx) = mpsc::channel::<Duration>(i16::MAX as usize); | ||
| let address = args.api_address.clone(); | ||
| let duration = args.load_test_duration; | ||
|
|
||
| let handle = tokio::spawn(async move { | ||
| let start = Instant::now(); | ||
| let mut interval = tokio::time::interval(Duration::from_secs(1)); | ||
| let mut workers = tokio::task::JoinSet::new(); | ||
|
|
||
| interval.tick().await; | ||
| while start.elapsed() < duration { | ||
| interval.tick().await; | ||
|
|
||
| let tx = tx.clone(); | ||
| let addr = address.clone(); | ||
| let remaining = duration.saturating_sub(start.elapsed()); | ||
|
|
||
| workers.spawn(async move { | ||
| ping_continuously(addr, tx, remaining).await; | ||
| }); | ||
| } | ||
|
|
||
| // Drop the scheduler's clone so only workers hold senders | ||
| drop(tx); | ||
|
|
||
| // Wait for all spawned ping workers to finish | ||
| while workers.join_next().await.is_some() {} | ||
| }); | ||
|
|
||
| let _ = handle.await; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This only waits for the scheduler task, not the spawned ping worker tasks themselves. So when calling |
||
|
|
||
| // All senders dropped, collect all RTTs | ||
| rx.close(); | ||
| let mut rtts = Vec::new(); | ||
| while let Some(rtt) = rx.recv().await { | ||
| rtts.push(rtt); | ||
| } | ||
|
|
||
| tracing::info!(target = %args.api_address, "Ping load tests finished"); | ||
|
|
||
| result = evaluate_highest_rtt(rtts, result, THRESHOLD_LOAD_AVG, THRESHOLD_LOAD_POOR); | ||
|
|
||
| result | ||
| } | ||
|
|
||
| async fn ping_continuously(address: String, tx: mpsc::Sender<Duration>, max_duration: Duration) { | ||
| let start = Instant::now(); | ||
|
|
||
| while start.elapsed() < max_duration { | ||
| let before = Instant::now(); | ||
|
|
||
| match timeout(Duration::from_secs(1), TcpStream::connect(&address)).await { | ||
| Ok(Ok(conn)) => { | ||
| let rtt = before.elapsed(); | ||
| if tx.send(rtt).await.is_err() { | ||
| drop(conn); | ||
| return; | ||
| } | ||
| } | ||
| Ok(Err(e)) => { | ||
| tracing::warn!(target = %address, error = ?e, "Ping connection attempt failed during load test"); | ||
| } | ||
| Err(e) => { | ||
| tracing::warn!(target = %address, error = ?e, "Ping connection attempt timed out during load test"); | ||
| } | ||
| } | ||
| let sleep_ms = rand::thread_rng().gen_range(0..100); | ||
| tokio::time::sleep(Duration::from_millis(sleep_ms)).await; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeout/interrupted