diff --git a/README.md b/README.md index baaf19e..1007cd3 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,6 @@ This diagram shows an overview of the architecture: ![architecture](docs/images/greenwave_diagram.png) - For implementation details and inline integration guidance, see [`docs/DESIGN_AND_IMPLEMENTATION.md`](docs/DESIGN_AND_IMPLEMENTATION.md). diff --git a/greenwave_monitor/config/example.yaml b/greenwave_monitor/config/example.yaml index a143411..473dcb5 100644 --- a/greenwave_monitor/config/example.yaml +++ b/greenwave_monitor/config/example.yaml @@ -19,6 +19,13 @@ greenwave_monitor: ros__parameters: + # Controls which timestamp source is used for low-FPS error checks. + # Valid values: + # - header_with_nodetime_fallback (default): header timestamps when available, otherwise node + # time fallback for headerless topics. + # - header_only: use only header/message timestamps (no fallback for headerless topics). + # - nodetime_only: always use node/pub-time checks. + gw_timestamp_monitor_mode: header_with_nodetime_fallback # gw_monitored_topics parameter specifies topics to monitor that do not require expected # frequencies or tolerances. gw_monitored_topics: ['/string_topic'] diff --git a/greenwave_monitor/include/greenwave_diagnostics.hpp b/greenwave_monitor/include/greenwave_diagnostics.hpp index b291d73..e0ca3e1 100644 --- a/greenwave_monitor/include/greenwave_diagnostics.hpp +++ b/greenwave_monitor/include/greenwave_diagnostics.hpp @@ -35,6 +35,13 @@ namespace greenwave_diagnostics { +enum class TimestampMonitorMode +{ + kHeaderWithNodetimeFallback, + kHeaderOnly, + kNodetimeOnly +}; + namespace constants { inline constexpr uint64_t kSecondsToNanoseconds = 1000000000ULL; @@ -51,18 +58,10 @@ inline constexpr int64_t kNonsenseLatencyMs = 365LL * 24LL * 60LL * 60LL * 1000L // Configurations for a greenwave diagnostics struct GreenwaveDiagnosticsConfig { - // diagnostics toggle - bool enable_diagnostics{false}; - - // corresponds to launch arguments - bool enable_all_diagnostics{false}; bool enable_node_time_diagnostics{false}; bool enable_msg_time_diagnostics{false}; bool enable_increasing_msg_time_diagnostics{false}; - // enable basic diagnostics for all topics, triggered by an environment variable - bool enable_all_topic_diagnostics{false}; - // Window size of the mean filter in terms of number of messages received int filter_window_size{300}; @@ -97,6 +96,7 @@ class GreenwaveDiagnostics prev_drop_ts_ = rclcpp::Time(0, 0, clock_->get_clock_type()); prev_noninc_msg_ts_ = rclcpp::Time(0, 0, clock_->get_clock_type()); + prev_drop_node_ts_ = rclcpp::Time(0, 0, clock_->get_clock_type()); prev_timestamp_node_us_ = std::numeric_limits::min(); prev_timestamp_msg_us_ = std::numeric_limits::min(); num_non_increasing_msg_ = 0; @@ -278,11 +278,14 @@ class GreenwaveDiagnostics return message_latency_msg_ms_; } - void setExpectedDt(double expected_hz, double tolerance_percent) + void setExpectedDt( + double expected_hz, double tolerance_percent, + bool enable_node_checks = true, bool enable_msg_checks = true) { const std::lock_guard lock(greenwave_diagnostics_mutex_); - diagnostics_config_.enable_node_time_diagnostics = true; - diagnostics_config_.enable_msg_time_diagnostics = true; + diagnostics_config_.enable_node_time_diagnostics = enable_node_checks; + diagnostics_config_.enable_msg_time_diagnostics = enable_msg_checks; + diagnostics_config_.enable_increasing_msg_time_diagnostics = enable_msg_checks; // This prevents accidental 0 division in the calculations in case of // a direct function call (not from service in greenwave_monitor.cpp) @@ -314,6 +317,7 @@ class GreenwaveDiagnostics const std::lock_guard lock(greenwave_diagnostics_mutex_); diagnostics_config_.enable_node_time_diagnostics = false; diagnostics_config_.enable_msg_time_diagnostics = false; + diagnostics_config_.enable_increasing_msg_time_diagnostics = false; diagnostics_config_.expected_dt_us = 0; diagnostics_config_.jitter_tolerance_us = 0; @@ -388,7 +392,7 @@ class GreenwaveDiagnostics std::vector status_vec_; rclcpp::Clock::SharedPtr clock_; rclcpp::Time t_start_; - rclcpp::Time prev_drop_ts_, prev_noninc_msg_ts_; + rclcpp::Time prev_drop_ts_, prev_noninc_msg_ts_, prev_drop_node_ts_; uint64_t prev_timestamp_node_us_, prev_timestamp_msg_us_; RollingWindow node_window_; @@ -423,6 +427,7 @@ class GreenwaveDiagnostics const bool missed_deadline_node = node_window_.addJitter(abs_jitter_node, diagnostics_config_.jitter_tolerance_us); if (missed_deadline_node) { + prev_drop_node_ts_ = clock_->now(); RCLCPP_DEBUG( node_.get_logger(), "[GreenwaveDiagnostics Node Time]" @@ -434,6 +439,15 @@ class GreenwaveDiagnostics static_cast(diagnostics_config_.jitter_tolerance_us), abs_jitter_node, topic_name_.c_str()); } + if (prev_drop_node_ts_.nanoseconds() != 0) { + auto time_since_drop = (clock_->now() - prev_drop_node_ts_).seconds(); + if (time_since_drop < greenwave_diagnostics::constants::kDropWarnTimeoutSeconds) { + error_found = true; + status_vec_[0].level = diagnostic_msgs::msg::DiagnosticStatus::ERROR; + update_status_message(status_vec_[0], "FRAME DROP DETECTED (NODE TIME)"); + } + } + return error_found; } diff --git a/greenwave_monitor/include/greenwave_monitor.hpp b/greenwave_monitor/include/greenwave_monitor.hpp index d6368c9..84f8a97 100644 --- a/greenwave_monitor/include/greenwave_monitor.hpp +++ b/greenwave_monitor/include/greenwave_monitor.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include "rclcpp/rclcpp.hpp" @@ -88,6 +89,10 @@ class GreenwaveMonitor : public rclcpp::Node void add_topics_from_parameters(); + greenwave_diagnostics::TimestampMonitorMode timestamp_monitor_mode_{ + greenwave_diagnostics::TimestampMonitorMode::kHeaderWithNodetimeFallback}; + std::unordered_map topic_has_header_; + std::map> greenwave_diagnostics_; std::vector> subscriptions_; diff --git a/greenwave_monitor/src/greenwave_monitor.cpp b/greenwave_monitor/src/greenwave_monitor.cpp index 3736b51..74a4b6a 100644 --- a/greenwave_monitor/src/greenwave_monitor.cpp +++ b/greenwave_monitor/src/greenwave_monitor.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "rosidl_typesupport_introspection_cpp/message_introspection.hpp" @@ -34,9 +35,59 @@ namespace constants inline constexpr const char * kTopicParamPrefix = "gw_frequency_monitored_topics."; inline constexpr const char * kExpectedFrequencySuffix = ".expected_frequency"; inline constexpr const char * kToleranceSuffix = ".tolerance"; +inline constexpr const char * kTimestampMonitorModeParam = "gw_timestamp_monitor_mode"; +inline constexpr const char * kTimestampModeHeaderWithFallback = "header_with_nodetime_fallback"; +inline constexpr const char * kTimestampModeHeaderOnly = "header_only"; +inline constexpr const char * kTimestampModeNodetimeOnly = "nodetime_only"; } // namespace constants } // namespace greenwave_monitor +namespace +{ +greenwave_diagnostics::TimestampMonitorMode parse_timestamp_monitor_mode( + const std::string & mode, rclcpp::Logger logger) +{ + using greenwave_diagnostics::TimestampMonitorMode; + using greenwave_monitor::constants::kTimestampModeHeaderOnly; + using greenwave_monitor::constants::kTimestampModeHeaderWithFallback; + using greenwave_monitor::constants::kTimestampModeNodetimeOnly; + + if (mode == kTimestampModeHeaderWithFallback) { + return TimestampMonitorMode::kHeaderWithNodetimeFallback; + } + if (mode == kTimestampModeHeaderOnly) { + return TimestampMonitorMode::kHeaderOnly; + } + if (mode == kTimestampModeNodetimeOnly) { + return TimestampMonitorMode::kNodetimeOnly; + } + + RCLCPP_WARN( + logger, + "Invalid value '%s' for gw_timestamp_monitor_mode. Falling back to default." + " Allowed values are '%s', '%s', '%s'.", + mode.c_str(), + kTimestampModeHeaderWithFallback, + kTimestampModeHeaderOnly, + kTimestampModeNodetimeOnly); + return TimestampMonitorMode::kHeaderWithNodetimeFallback; +} + +std::pair resolve_timestamp_checks( + greenwave_diagnostics::TimestampMonitorMode mode, bool topic_has_header) +{ + switch (mode) { + case greenwave_diagnostics::TimestampMonitorMode::kHeaderWithNodetimeFallback: + return topic_has_header ? std::make_pair(false, true) : std::make_pair(true, false); + case greenwave_diagnostics::TimestampMonitorMode::kHeaderOnly: + return topic_has_header ? std::make_pair(false, true) : std::make_pair(false, false); + case greenwave_diagnostics::TimestampMonitorMode::kNodetimeOnly: + return std::make_pair(true, false); + } + return std::make_pair(false, false); +} +} // namespace + GreenwaveMonitor::GreenwaveMonitor(const rclcpp::NodeOptions & options) : Node("greenwave_monitor", rclcpp::NodeOptions(options) @@ -48,6 +99,15 @@ GreenwaveMonitor::GreenwaveMonitor(const rclcpp::NodeOptions & options) if (!this->has_parameter("gw_monitored_topics")) { this->declare_parameter>("gw_monitored_topics", {""}); } + if (!this->has_parameter(greenwave_monitor::constants::kTimestampMonitorModeParam)) { + this->declare_parameter( + greenwave_monitor::constants::kTimestampMonitorModeParam, + greenwave_monitor::constants::kTimestampModeHeaderWithFallback); + } + + const auto monitor_mode = this->get_parameter( + greenwave_monitor::constants::kTimestampMonitorModeParam).as_string(); + timestamp_monitor_mode_ = parse_timestamp_monitor_mode(monitor_mode, this->get_logger()); timer_ = this->create_wall_timer( 1s, std::bind(&GreenwaveMonitor::timer_callback, this)); @@ -176,7 +236,19 @@ void GreenwaveMonitor::handle_set_expected_frequency( return; } - msg_diagnostics_obj.setExpectedDt(request->expected_hz, request->tolerance_percent); + bool topic_has_header = false; + if (auto header_it = topic_has_header_.find(request->topic_name); + header_it != topic_has_header_.end()) + { + topic_has_header = header_it->second; + } + const auto [enable_node, enable_msg] = resolve_timestamp_checks( + timestamp_monitor_mode_, topic_has_header); + msg_diagnostics_obj.setExpectedDt( + request->expected_hz, + request->tolerance_percent, + enable_node, + enable_msg); response->success = true; response->message = "Successfully set expected frequency for topic '" + @@ -298,7 +370,7 @@ bool GreenwaveMonitor::add_topic( }); greenwave_diagnostics::GreenwaveDiagnosticsConfig diagnostics_config; - diagnostics_config.enable_all_topic_diagnostics = true; + topic_has_header_[topic] = has_header_from_type(type); subscriptions_.push_back(sub); greenwave_diagnostics_.emplace( @@ -330,6 +402,7 @@ bool GreenwaveMonitor::remove_topic(const std::string & topic, std::string & mes } greenwave_diagnostics_.erase(diag_it); + topic_has_header_.erase(topic); message = "Successfully removed topic"; return true; } @@ -444,7 +517,17 @@ void GreenwaveMonitor::add_topics_from_parameters() static const double retry_wait_s = 0.5; if (add_topic(topic, message, max_retries, retry_wait_s)) { if (expected_frequency > 0.0) { - greenwave_diagnostics_[topic]->setExpectedDt(expected_frequency, tolerance); + bool topic_has_header = false; + if (auto header_it = topic_has_header_.find(topic); header_it != topic_has_header_.end()) { + topic_has_header = header_it->second; + } + const auto [enable_node, enable_msg] = resolve_timestamp_checks( + timestamp_monitor_mode_, topic_has_header); + greenwave_diagnostics_[topic]->setExpectedDt( + expected_frequency, + tolerance, + enable_node, + enable_msg); } else { RCLCPP_WARN( this->get_logger(), diff --git a/greenwave_monitor/src/minimal_publisher_node.cpp b/greenwave_monitor/src/minimal_publisher_node.cpp index 88b695b..c49c95e 100644 --- a/greenwave_monitor/src/minimal_publisher_node.cpp +++ b/greenwave_monitor/src/minimal_publisher_node.cpp @@ -83,7 +83,6 @@ MinimalPublisher::MinimalPublisher(const rclcpp::NodeOptions & options) std::chrono::nanoseconds(period_ns), std::bind(&MinimalPublisher::timer_callback, this)); greenwave_diagnostics::GreenwaveDiagnosticsConfig diagnostics_config; - diagnostics_config.enable_all_topic_diagnostics = true; greenwave_diagnostics_ = std::make_unique( *this, topic, diagnostics_config); } diff --git a/greenwave_monitor/test/test_greenwave_diagnostics.cpp b/greenwave_monitor/test/test_greenwave_diagnostics.cpp index c201049..34128d7 100644 --- a/greenwave_monitor/test/test_greenwave_diagnostics.cpp +++ b/greenwave_monitor/test/test_greenwave_diagnostics.cpp @@ -62,6 +62,46 @@ class GreenwaveDiagnosticsTest : public ::testing::Test std::shared_ptr node_; }; +diagnostic_msgs::msg::DiagnosticStatus run_low_fps_sequence( + const std::shared_ptr & node, + const greenwave_diagnostics::GreenwaveDiagnosticsConfig & config, + double expected_hz, + double tolerance_percent, + bool enable_node_checks, + bool enable_msg_checks) +{ + greenwave_diagnostics::GreenwaveDiagnostics diagnostics(*node, "test_topic", config); + diagnostics.setExpectedDt( + expected_hz, tolerance_percent, enable_node_checks, enable_msg_checks); + std::vector received_diagnostics; + const auto diagnostic_subscription = + node->create_subscription( + "/diagnostics", 10, + [&received_diagnostics](const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg) { + received_diagnostics.push_back(msg); + }); + (void)diagnostic_subscription; + + constexpr int samples = 15; + constexpr int64_t input_dt_ns = 10000000LL; // 100 Hz message timestamp cadence + uint64_t msg_timestamp = test_constants::kStartTimestampNs; + for (int i = 0; i < samples; ++i) { + diagnostics.updateDiagnostics(msg_timestamp); + diagnostics.publishDiagnostics(); + rclcpp::spin_some(node); + msg_timestamp += static_cast(input_dt_ns); + std::this_thread::sleep_for(std::chrono::milliseconds(25)); // ~40 Hz node cadence + } + + if (received_diagnostics.empty() || received_diagnostics.back()->status.empty()) { + diagnostic_msgs::msg::DiagnosticStatus empty_status; + empty_status.level = diagnostic_msgs::msg::DiagnosticStatus::STALE; + return empty_status; + } + + return received_diagnostics.back()->status[0]; +} + TEST_F(GreenwaveDiagnosticsTest, FrameRateMsgTest) { // Initialize GreenwaveDiagnostics @@ -212,7 +252,8 @@ TEST_F(GreenwaveDiagnosticsTest, DiagnosticPublishSubscribeTest) const auto & diagnostic_status = last_diagnostic->status[0]; EXPECT_TRUE(diagnostic_status.name.find("test_topic") != std::string::npos); EXPECT_EQ(diagnostic_status.level, diagnostic_msgs::msg::DiagnosticStatus::ERROR); - EXPECT_EQ(diagnostic_status.message, "FRAME DROP DETECTED, NONINCREASING TIMESTAMP"); + EXPECT_NE(diagnostic_status.message.find("FRAME DROP DETECTED"), std::string::npos); + EXPECT_NE(diagnostic_status.message.find("NONINCREASING TIMESTAMP"), std::string::npos); // Parse diagnostic values std::map diagnostics_values = { @@ -276,3 +317,56 @@ TEST_F(GreenwaveDiagnosticsTest, DiagnosticPublishSubscribeTest) EXPECT_GE(diagnostics_values["total_dropped_frames"], 1.0); EXPECT_GE(diagnostics_values["num_non_increasing_msg"], 1.0); } + +TEST_F(GreenwaveDiagnosticsTest, HeaderWithFallbackHeaderlessEnablesNodeChecksOnly) +{ + greenwave_diagnostics::GreenwaveDiagnosticsConfig config; + const auto status = run_low_fps_sequence( + node_, config, 100.0, 10.0, true, false); + + EXPECT_EQ(status.level, diagnostic_msgs::msg::DiagnosticStatus::ERROR); + EXPECT_NE(status.message.find("FRAME DROP DETECTED (NODE TIME)"), std::string::npos); +} + +TEST_F(GreenwaveDiagnosticsTest, HeaderOnlyHeaderlessDisablesBothChecks) +{ + greenwave_diagnostics::GreenwaveDiagnosticsConfig config; + const auto status = run_low_fps_sequence( + node_, config, 100.0, 10.0, false, false); + + EXPECT_EQ(status.level, diagnostic_msgs::msg::DiagnosticStatus::OK); + EXPECT_EQ(status.message, "OK"); +} + +TEST_F(GreenwaveDiagnosticsTest, HeaderWithFallbackHeaderedEnablesMsgChecksOnly) +{ + greenwave_diagnostics::GreenwaveDiagnosticsConfig config; + + // Message timestamp cadence in run_low_fps_sequence is 100 Hz, so message-time checks + // should remain healthy even though node-time cadence is slowed down to ~40 Hz. + const auto status = run_low_fps_sequence( + node_, config, 100.0, 10.0, false, true); + + EXPECT_EQ(status.level, diagnostic_msgs::msg::DiagnosticStatus::OK); + EXPECT_EQ(status.message, "OK"); +} + +TEST_F(GreenwaveDiagnosticsTest, NodetimeOnlyHeaderedEnablesNodeChecksOnly) +{ + greenwave_diagnostics::GreenwaveDiagnosticsConfig config; + const auto status = run_low_fps_sequence( + node_, config, 100.0, 10.0, true, false); + + EXPECT_EQ(status.level, diagnostic_msgs::msg::DiagnosticStatus::ERROR); + EXPECT_NE(status.message.find("FRAME DROP DETECTED (NODE TIME)"), std::string::npos); +} + +TEST_F(GreenwaveDiagnosticsTest, NodeTimeJitterReportsError) +{ + greenwave_diagnostics::GreenwaveDiagnosticsConfig config; + const auto status = run_low_fps_sequence( + node_, config, 100.0, 10.0, true, false); + + EXPECT_EQ(status.level, diagnostic_msgs::msg::DiagnosticStatus::ERROR); + EXPECT_NE(status.message.find("FRAME DROP DETECTED (NODE TIME)"), std::string::npos); +} diff --git a/greenwave_monitor/test/test_greenwave_monitor.py b/greenwave_monitor/test/test_greenwave_monitor.py index 4fd10c4..1def588 100644 --- a/greenwave_monitor/test/test_greenwave_monitor.py +++ b/greenwave_monitor/test/test_greenwave_monitor.py @@ -23,6 +23,7 @@ import time import unittest +from diagnostic_msgs.msg import DiagnosticStatus from greenwave_monitor.test_utils import ( call_manage_topic_service, collect_diagnostics_for_topic, @@ -69,6 +70,25 @@ NONEXISTENT_EXPECTED_FREQUENCY = 10.0 NONEXISTENT_TOLERANCE = 1.0 +TIMESTAMP_MODE_EXPECTED_FREQUENCY = 100.0 +TIMESTAMP_MODE_TOLERANCE = 10.0 +TIMESTAMP_MODE_PUBLISH_FREQUENCY = 20.0 + +TIMESTAMP_MODE_HEADER_FALLBACK_NAMESPACE = 'timestamp_mode_header_fallback_ns' +TIMESTAMP_MODE_HEADER_ONLY_NAMESPACE = 'timestamp_mode_header_only_ns' +TIMESTAMP_MODE_NODETIME_ONLY_NAMESPACE = 'timestamp_mode_nodetime_only_ns' +TIMESTAMP_MODE_INVALID_NAMESPACE = 'timestamp_mode_invalid_ns' + +TIMESTAMP_MODE_HEADER_FALLBACK_NODE = 'timestamp_mode_header_fallback_monitor' +TIMESTAMP_MODE_HEADER_ONLY_NODE = 'timestamp_mode_header_only_monitor' +TIMESTAMP_MODE_NODETIME_ONLY_NODE = 'timestamp_mode_nodetime_only_monitor' +TIMESTAMP_MODE_INVALID_NODE = 'timestamp_mode_invalid_monitor' + +TIMESTAMP_MODE_HEADER_FALLBACK_TOPIC = '/timestamp_mode_header_fallback_topic' +TIMESTAMP_MODE_HEADER_ONLY_HEADERLESS_TOPIC = '/timestamp_mode_header_only_headerless_topic' +TIMESTAMP_MODE_NODETIME_ONLY_TOPIC = '/timestamp_mode_nodetime_only_topic' +TIMESTAMP_MODE_INVALID_TOPIC = '/timestamp_mode_invalid_topic' + def create_test_yaml_config(): """Create a temporary YAML config file for testing parameter loading.""" @@ -96,6 +116,26 @@ def create_test_yaml_config(): return filepath +def create_timestamp_mode_yaml_config(mode, topic, expected_frequency, tolerance): + """Create a temporary YAML config file for timestamp mode integration tests.""" + yaml_content = f"""\ +/**: + ros__parameters: + gw_monitored_topics: ['{topic}'] + gw_timestamp_monitor_mode: '{mode}' + gw_frequency_monitored_topics: + {topic}: + expected_frequency: {expected_frequency} + tolerance: {tolerance} +""" + safe_mode = mode.replace('/', '_') + safe_topic = topic.replace('/', '_') + filepath = os.path.join(_temp_dir.name, f'timestamp_mode_{safe_mode}_{safe_topic}.yaml') + with open(filepath, 'w') as f: + f.write(yaml_content) + return filepath + + @pytest.mark.launch_test @launch_testing.parametrize('message_type, expected_frequency, tolerance_hz', TEST_CONFIGURATIONS) def generate_test_description(message_type, expected_frequency, tolerance_hz): @@ -129,6 +169,80 @@ def generate_test_description(message_type, expected_frequency, tolerance_hz): expected_frequency, message_type, '_invalid_expected_frequency') ] + timestamp_mode_entities = [] + if (message_type, expected_frequency, tolerance_hz) == MANAGE_TOPIC_TEST_CONFIG: + header_fallback_yaml = create_timestamp_mode_yaml_config( + 'header_with_nodetime_fallback', + TIMESTAMP_MODE_HEADER_FALLBACK_TOPIC, + TIMESTAMP_MODE_EXPECTED_FREQUENCY, + TIMESTAMP_MODE_TOLERANCE, + ) + header_only_yaml = create_timestamp_mode_yaml_config( + 'header_only', + TIMESTAMP_MODE_HEADER_ONLY_HEADERLESS_TOPIC, + TIMESTAMP_MODE_EXPECTED_FREQUENCY, + TIMESTAMP_MODE_TOLERANCE, + ) + nodetime_only_yaml = create_timestamp_mode_yaml_config( + 'nodetime_only', + TIMESTAMP_MODE_NODETIME_ONLY_TOPIC, + TIMESTAMP_MODE_EXPECTED_FREQUENCY, + TIMESTAMP_MODE_TOLERANCE, + ) + invalid_mode_yaml = create_timestamp_mode_yaml_config( + 'not_a_real_mode', + TIMESTAMP_MODE_INVALID_TOPIC, + TIMESTAMP_MODE_EXPECTED_FREQUENCY, + TIMESTAMP_MODE_TOLERANCE, + ) + + timestamp_mode_entities = [ + create_monitor_node( + namespace=TIMESTAMP_MODE_HEADER_FALLBACK_NAMESPACE, + node_name=TIMESTAMP_MODE_HEADER_FALLBACK_NODE, + parameters=[header_fallback_yaml], + ), + create_monitor_node( + namespace=TIMESTAMP_MODE_HEADER_ONLY_NAMESPACE, + node_name=TIMESTAMP_MODE_HEADER_ONLY_NODE, + parameters=[header_only_yaml], + ), + create_monitor_node( + namespace=TIMESTAMP_MODE_NODETIME_ONLY_NAMESPACE, + node_name=TIMESTAMP_MODE_NODETIME_ONLY_NODE, + parameters=[nodetime_only_yaml], + ), + create_monitor_node( + namespace=TIMESTAMP_MODE_INVALID_NAMESPACE, + node_name=TIMESTAMP_MODE_INVALID_NODE, + parameters=[invalid_mode_yaml], + ), + create_minimal_publisher( + TIMESTAMP_MODE_HEADER_FALLBACK_TOPIC, + TIMESTAMP_MODE_PUBLISH_FREQUENCY, + 'imu', + '_timestamp_mode_header_fallback', + ), + create_minimal_publisher( + TIMESTAMP_MODE_HEADER_ONLY_HEADERLESS_TOPIC, + TIMESTAMP_MODE_PUBLISH_FREQUENCY, + 'string', + '_timestamp_mode_header_only_headerless', + ), + create_minimal_publisher( + TIMESTAMP_MODE_NODETIME_ONLY_TOPIC, + TIMESTAMP_MODE_PUBLISH_FREQUENCY, + 'imu', + '_timestamp_mode_nodetime_only', + ), + create_minimal_publisher( + TIMESTAMP_MODE_INVALID_TOPIC, + TIMESTAMP_MODE_PUBLISH_FREQUENCY, + 'imu', + '_timestamp_mode_invalid', + ), + ] + context = { 'expected_frequency': expected_frequency, 'message_type': message_type, @@ -139,6 +253,7 @@ def generate_test_description(message_type, expected_frequency, tolerance_hz): launch.LaunchDescription([ ros2_monitor_node, *publishers, # Unpack all publishers into the launch description + *timestamp_mode_entities, launch_testing.actions.ReadyToTest() ]), context ) @@ -239,6 +354,17 @@ def call_manage_topic(self, add, topic, service_client): self.assertIsNotNone(response, 'Service call failed or timed out') return response + def check_monitor_services(self, namespace, node_name): + """Check that monitor services are available for a specific node.""" + manage_client, _ = create_service_clients(self.test_node, namespace, node_name) + service_available = wait_for_service_connection( + self.test_node, manage_client, timeout_sec=10.0, + service_name=f'/{namespace}/{node_name}/manage_topic' + ) + self.assertTrue( + service_available, + f'Service "/{namespace}/{node_name}/manage_topic" not available within timeout') + def test_manage_one_topic(self, expected_frequency, message_type, tolerance_hz): """Test that add_topic() and remove_topic() work correctly for one topic.""" if (message_type, expected_frequency, tolerance_hz) != MANAGE_TOPIC_TEST_CONFIG: @@ -415,6 +541,82 @@ def test_yaml_parameter_loading(self, expected_frequency, message_type, toleranc f'Topic {NONEXISTENT_TOPIC} should not be monitored' ) + def test_timestamp_mode_parameter_parsing( + self, expected_frequency, message_type, tolerance_hz): + """Test that all timestamp monitor modes parse and start successfully.""" + if (message_type, expected_frequency, tolerance_hz) != MANAGE_TOPIC_TEST_CONFIG: + self.skipTest('Only running timestamp mode parsing tests once') + + self.check_monitor_services( + TIMESTAMP_MODE_HEADER_FALLBACK_NAMESPACE, TIMESTAMP_MODE_HEADER_FALLBACK_NODE) + self.check_monitor_services( + TIMESTAMP_MODE_HEADER_ONLY_NAMESPACE, TIMESTAMP_MODE_HEADER_ONLY_NODE) + self.check_monitor_services( + TIMESTAMP_MODE_NODETIME_ONLY_NAMESPACE, TIMESTAMP_MODE_NODETIME_ONLY_NODE) + + def test_nodetime_only_detects_low_fps(self, expected_frequency, message_type, tolerance_hz): + """Test nodetime_only mode reports low-FPS errors on headered topics.""" + if (message_type, expected_frequency, tolerance_hz) != MANAGE_TOPIC_TEST_CONFIG: + self.skipTest('Only running nodetime_only timestamp mode test once') + + received_diagnostics = collect_diagnostics_for_topic( + self.test_node, + TIMESTAMP_MODE_NODETIME_ONLY_TOPIC, + expected_count=5, + timeout_sec=15.0 + ) + self.assertGreaterEqual(len(received_diagnostics), 1) + + error_statuses = [s for s in received_diagnostics if s.level == DiagnosticStatus.ERROR] + self.assertTrue( + any('FRAME DROP DETECTED (NODE TIME)' in s.message for s in error_statuses), + 'Expected nodetime_only mode to report node-time frame drop errors') + + def test_header_only_no_error_for_headerless( + self, expected_frequency, message_type, tolerance_hz): + """Test header_only mode produces no error for headerless topics.""" + if (message_type, expected_frequency, tolerance_hz) != MANAGE_TOPIC_TEST_CONFIG: + self.skipTest('Only running header_only timestamp mode test once') + + received_diagnostics = collect_diagnostics_for_topic( + self.test_node, + TIMESTAMP_MODE_HEADER_ONLY_HEADERLESS_TOPIC, + expected_count=5, + timeout_sec=15.0 + ) + self.assertGreaterEqual(len(received_diagnostics), 1) + + self.assertFalse( + any(status.level == DiagnosticStatus.ERROR for status in received_diagnostics), + 'Header-only mode should not report errors for headerless topics') + self.assertTrue( + any(status.level == DiagnosticStatus.OK for status in received_diagnostics), + 'Header-only mode should still publish OK diagnostics for headerless topics') + + def test_invalid_mode_falls_back_to_default( + self, expected_frequency, message_type, tolerance_hz): + """Test invalid mode strings fall back to default behavior.""" + if (message_type, expected_frequency, tolerance_hz) != MANAGE_TOPIC_TEST_CONFIG: + self.skipTest('Only running invalid timestamp mode test once') + + self.check_monitor_services(TIMESTAMP_MODE_INVALID_NAMESPACE, TIMESTAMP_MODE_INVALID_NODE) + + received_diagnostics = collect_diagnostics_for_topic( + self.test_node, + TIMESTAMP_MODE_INVALID_TOPIC, + expected_count=5, + timeout_sec=15.0 + ) + self.assertGreaterEqual(len(received_diagnostics), 1) + + error_statuses = [s for s in received_diagnostics if s.level == DiagnosticStatus.ERROR] + self.assertTrue( + any('FRAME DROP DETECTED' in s.message for s in error_statuses), + 'Invalid mode should fall back to default mode and report frame-drop errors') + self.assertTrue( + all('FRAME DROP DETECTED (NODE TIME)' not in s.message for s in error_statuses), + 'Invalid mode fallback should use header checks for headered topics') + if __name__ == '__main__': unittest.main()