From 7c739bae42892ce7d3a9195d0a20ab4d1569695e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arkadiusz=20J=C4=99drzejewski?= Date: Thu, 5 Mar 2026 11:32:22 +0100 Subject: [PATCH 1/3] hmon: add logic monitor Add logic monitor to HMON. --- src/health_monitoring_lib/rust/common.rs | 9 +- src/health_monitoring_lib/rust/lib.rs | 128 +++- .../rust/logic/logic_monitor.rs | 573 ++++++++++++++++++ src/health_monitoring_lib/rust/logic/mod.rs | 16 + src/health_monitoring_lib/rust/tag.rs | 67 +- src/health_monitoring_lib/rust/worker.rs | 7 +- 6 files changed, 794 insertions(+), 6 deletions(-) create mode 100644 src/health_monitoring_lib/rust/logic/logic_monitor.rs create mode 100644 src/health_monitoring_lib/rust/logic/mod.rs diff --git a/src/health_monitoring_lib/rust/common.rs b/src/health_monitoring_lib/rust/common.rs index 5d8c8e3e..eb3904e1 100644 --- a/src/health_monitoring_lib/rust/common.rs +++ b/src/health_monitoring_lib/rust/common.rs @@ -14,6 +14,7 @@ use crate::deadline::DeadlineEvaluationError; use crate::heartbeat::HeartbeatEvaluationError; use crate::log::ScoreDebug; +use crate::logic::LogicEvaluationError; use crate::tag::MonitorTag; use core::hash::Hash; use core::time::Duration; @@ -82,7 +83,7 @@ pub(crate) trait Monitor { pub(crate) enum MonitorEvaluationError { Deadline(DeadlineEvaluationError), Heartbeat(HeartbeatEvaluationError), - Logic, + Logic(LogicEvaluationError), } impl From for MonitorEvaluationError { @@ -97,6 +98,12 @@ impl From for MonitorEvaluationError { } } +impl From for MonitorEvaluationError { + fn from(value: LogicEvaluationError) -> Self { + MonitorEvaluationError::Logic(value) + } +} + /// Trait for evaluating monitors and reporting errors to be used by HealthMonitor. pub(crate) trait MonitorEvaluator { /// Run monitor evaluation. diff --git a/src/health_monitoring_lib/rust/lib.rs b/src/health_monitoring_lib/rust/lib.rs index 24187565..4f0f038f 100644 --- a/src/health_monitoring_lib/rust/lib.rs +++ b/src/health_monitoring_lib/rust/lib.rs @@ -21,11 +21,13 @@ mod worker; pub mod deadline; pub mod heartbeat; +pub mod logic; use crate::common::{Monitor, MonitorEvalHandle}; use crate::deadline::{DeadlineMonitor, DeadlineMonitorBuilder}; use crate::heartbeat::{HeartbeatMonitor, HeartbeatMonitorBuilder}; use crate::log::{error, ScoreDebug}; +use crate::logic::{LogicMonitor, LogicMonitorBuilder}; pub use common::TimeRange; use containers::fixed_capacity::FixedCapacityVec; use core::time::Duration; @@ -48,6 +50,7 @@ pub enum HealthMonitorError { pub struct HealthMonitorBuilder { deadline_monitor_builders: HashMap, heartbeat_monitor_builders: HashMap, + logic_monitor_builders: HashMap, supervisor_api_cycle: Duration, internal_processing_cycle: Duration, } @@ -58,6 +61,7 @@ impl HealthMonitorBuilder { Self { deadline_monitor_builders: HashMap::new(), heartbeat_monitor_builders: HashMap::new(), + logic_monitor_builders: HashMap::new(), supervisor_api_cycle: Duration::from_millis(500), internal_processing_cycle: Duration::from_millis(100), } @@ -89,6 +93,19 @@ impl HealthMonitorBuilder { self } + /// Add a [`LogicMonitor`] for the given [`MonitorTag`]. + /// + /// - `monitor_tag` - unique tag for the [`LogicMonitor`]. + /// - `monitor_builder` - monitor builder to finalize. + /// + /// # Note + /// + /// If a logic monitor with the same tag already exists, it will be overwritten. + pub fn add_logic_monitor(mut self, monitor_tag: MonitorTag, monitor_builder: LogicMonitorBuilder) -> Self { + self.add_logic_monitor_internal(monitor_tag, monitor_builder); + self + } + /// Set the interval between supervisor API notifications. /// This duration determines how often the health monitor notifies the supervisor about system liveness. /// @@ -121,7 +138,9 @@ impl HealthMonitorBuilder { } // Check number of monitors. - let num_monitors = self.deadline_monitor_builders.len() + self.heartbeat_monitor_builders.len(); + let num_monitors = self.deadline_monitor_builders.len() + + self.heartbeat_monitor_builders.len() + + self.logic_monitor_builders.len(); if num_monitors == 0 { error!("No monitors have been added. HealthMonitor cannot be created."); return Err(HealthMonitorError::WrongState); @@ -144,9 +163,17 @@ impl HealthMonitorBuilder { heartbeat_monitors.insert(tag, Some(MonitorState::Available(monitor))); } + // Create logic monitors. + let mut logic_monitors = HashMap::new(); + for (tag, builder) in self.logic_monitor_builders { + let monitor = builder.build(tag, &allocator)?; + logic_monitors.insert(tag, Some(MonitorState::Available(monitor))); + } + Ok(HealthMonitor { deadline_monitors, heartbeat_monitors, + logic_monitors, worker: worker::UniqueThreadRunner::new(self.internal_processing_cycle), supervisor_api_cycle: self.supervisor_api_cycle, }) @@ -170,6 +197,10 @@ impl HealthMonitorBuilder { self.heartbeat_monitor_builders.insert(monitor_tag, monitor_builder); } + pub(crate) fn add_logic_monitor_internal(&mut self, monitor_tag: MonitorTag, monitor_builder: LogicMonitorBuilder) { + self.logic_monitor_builders.insert(monitor_tag, monitor_builder); + } + pub(crate) fn with_supervisor_api_cycle_internal(&mut self, cycle_duration: Duration) { self.supervisor_api_cycle = cycle_duration; } @@ -196,6 +227,7 @@ type MonitorContainer = Option>; pub struct HealthMonitor { deadline_monitors: HashMap>, heartbeat_monitors: HashMap>, + logic_monitors: HashMap>, worker: worker::UniqueThreadRunner, supervisor_api_cycle: Duration, } @@ -241,6 +273,16 @@ impl HealthMonitor { Self::get_monitor(&mut self.heartbeat_monitors, monitor_tag) } + /// Get and pass ownership of a [`LogicMonitor`] for the given [`MonitorTag`]. + /// + /// - `monitor_tag` - unique tag for the [`LogicMonitor`]. + /// + /// Returns [`Some`] containing [`LogicMonitor`] if found and not taken. + /// Otherwise returns [`None`]. + pub fn get_logic_monitor(&mut self, monitor_tag: MonitorTag) -> Option { + Self::get_monitor(&mut self.logic_monitors, monitor_tag) + } + fn collect_given_monitors( monitors_to_collect: &mut HashMap>, collected_monitors: &mut FixedCapacityVec, @@ -287,10 +329,11 @@ impl HealthMonitor { /// Health monitoring logic stops when the [`HealthMonitor`] is dropped. pub fn start(&mut self) -> Result<(), HealthMonitorError> { // Collect all monitors. - let num_monitors = self.deadline_monitors.len() + self.heartbeat_monitors.len(); + let num_monitors = self.deadline_monitors.len() + self.heartbeat_monitors.len() + self.logic_monitors.len(); let mut collected_monitors = FixedCapacityVec::new(num_monitors); Self::collect_given_monitors(&mut self.deadline_monitors, &mut collected_monitors)?; Self::collect_given_monitors(&mut self.heartbeat_monitors, &mut collected_monitors)?; + Self::collect_given_monitors(&mut self.logic_monitors, &mut collected_monitors)?; // Start monitoring logic. let monitoring_logic = worker::MonitoringLogic::new( @@ -315,7 +358,8 @@ mod tests { use crate::common::TimeRange; use crate::deadline::DeadlineMonitorBuilder; use crate::heartbeat::HeartbeatMonitorBuilder; - use crate::tag::MonitorTag; + use crate::logic::LogicMonitorBuilder; + use crate::tag::{MonitorTag, StateTag}; use crate::{HealthMonitorBuilder, HealthMonitorError}; use core::time::Duration; @@ -324,11 +368,20 @@ mod tests { HeartbeatMonitorBuilder::new(range) } + fn def_logic_monitor_builder() -> LogicMonitorBuilder { + let state1 = StateTag::from("state1"); + let state2 = StateTag::from("state2"); + LogicMonitorBuilder::new(state1) + .add_state(state2) + .add_transition((state1, state2)) + } + #[test] fn health_monitor_builder_new_succeeds() { let health_monitor_builder = HealthMonitorBuilder::new(); assert!(health_monitor_builder.deadline_monitor_builders.is_empty()); assert!(health_monitor_builder.heartbeat_monitor_builders.is_empty()); + assert!(health_monitor_builder.logic_monitor_builders.is_empty()); assert_eq!(health_monitor_builder.supervisor_api_cycle, Duration::from_millis(500)); assert_eq!( health_monitor_builder.internal_processing_cycle, @@ -342,10 +395,13 @@ mod tests { let deadline_monitor_builder = DeadlineMonitorBuilder::new(); let heartbeat_monitor_tag = MonitorTag::from("heartbeat_monitor"); let heartbeat_monitor_builder = def_heartbeat_monitor_builder(); + let logic_monitor_tag = MonitorTag::from("logic_monitor"); + let logic_monitor_builder = def_logic_monitor_builder(); let result = HealthMonitorBuilder::new() .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) .add_heartbeat_monitor(heartbeat_monitor_tag, heartbeat_monitor_builder) + .add_logic_monitor(logic_monitor_tag, logic_monitor_builder) .build(); assert!(result.is_ok()); } @@ -475,21 +531,80 @@ mod tests { assert!(result.is_none()); } + #[test] + fn health_monitor_get_logic_monitor_available() { + let logic_monitor_tag = MonitorTag::from("logic_monitor"); + let logic_monitor_builder = def_logic_monitor_builder(); + let mut health_monitor = HealthMonitorBuilder::new() + .add_logic_monitor(logic_monitor_tag, logic_monitor_builder) + .build() + .unwrap(); + + let result = health_monitor.get_logic_monitor(logic_monitor_tag); + assert!(result.is_some()); + } + + #[test] + fn health_monitor_get_logic_monitor_taken() { + let logic_monitor_tag = MonitorTag::from("logic_monitor"); + let logic_monitor_builder = def_logic_monitor_builder(); + let mut health_monitor = HealthMonitorBuilder::new() + .add_logic_monitor(logic_monitor_tag, logic_monitor_builder) + .build() + .unwrap(); + + let _ = health_monitor.get_logic_monitor(logic_monitor_tag); + let result = health_monitor.get_logic_monitor(logic_monitor_tag); + assert!(result.is_none()); + } + + #[test] + fn health_monitor_get_logic_monitor_unknown() { + let logic_monitor_builder = def_logic_monitor_builder(); + let mut health_monitor = HealthMonitorBuilder::new() + .add_logic_monitor(MonitorTag::from("logic_monitor"), logic_monitor_builder) + .build() + .unwrap(); + + let result = health_monitor.get_logic_monitor(MonitorTag::from("undefined_monitor")); + assert!(result.is_none()); + } + + #[test] + fn health_monitor_get_logic_monitor_invalid_state() { + let logic_monitor_tag = MonitorTag::from("logic_monitor"); + let logic_monitor_builder = def_logic_monitor_builder(); + let mut health_monitor = HealthMonitorBuilder::new() + .add_logic_monitor(logic_monitor_tag, logic_monitor_builder) + .build() + .unwrap(); + + // Inject broken state - unreachable otherwise. + health_monitor.logic_monitors.insert(logic_monitor_tag, None); + + let result = health_monitor.get_logic_monitor(logic_monitor_tag); + assert!(result.is_none()); + } + #[test] fn health_monitor_start_succeeds() { let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); let deadline_monitor_builder = DeadlineMonitorBuilder::new(); let heartbeat_monitor_tag = MonitorTag::from("heartbeat_monitor"); let heartbeat_monitor_builder = def_heartbeat_monitor_builder(); + let logic_monitor_tag = MonitorTag::from("logic_monitor"); + let logic_monitor_builder = def_logic_monitor_builder(); let mut health_monitor = HealthMonitorBuilder::new() .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) .add_heartbeat_monitor(heartbeat_monitor_tag, heartbeat_monitor_builder) + .add_logic_monitor(logic_monitor_tag, logic_monitor_builder) .build() .unwrap(); let _deadline_monitor = health_monitor.get_deadline_monitor(deadline_monitor_tag).unwrap(); let _heartbeat_monitor = health_monitor.get_heartbeat_monitor(heartbeat_monitor_tag).unwrap(); + let _logic_monitor = health_monitor.get_logic_monitor(logic_monitor_tag).unwrap(); let result = health_monitor.start(); assert!(result.is_ok()); @@ -499,10 +614,12 @@ mod tests { fn health_monitor_start_monitors_not_taken() { let deadline_monitor_builder = DeadlineMonitorBuilder::new(); let heartbeat_monitor_builder = def_heartbeat_monitor_builder(); + let logic_monitor_builder = def_logic_monitor_builder(); let mut health_monitor = HealthMonitorBuilder::new() .add_deadline_monitor(MonitorTag::from("deadline_monitor"), deadline_monitor_builder) .add_heartbeat_monitor(MonitorTag::from("heartbeat_monitor"), heartbeat_monitor_builder) + .add_logic_monitor(MonitorTag::from("logic_monitor"), logic_monitor_builder) .build() .unwrap(); @@ -516,10 +633,13 @@ mod tests { let deadline_monitor_builder = DeadlineMonitorBuilder::new(); let heartbeat_monitor_tag = MonitorTag::from("heartbeat_monitor"); let heartbeat_monitor_builder = def_heartbeat_monitor_builder(); + let logic_monitor_tag = MonitorTag::from("logic_monitor"); + let logic_monitor_builder = def_logic_monitor_builder(); let mut health_monitor = HealthMonitorBuilder::new() .add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder) .add_heartbeat_monitor(heartbeat_monitor_tag, heartbeat_monitor_builder) + .add_logic_monitor(logic_monitor_tag, logic_monitor_builder) .build() .unwrap(); @@ -532,6 +652,8 @@ mod tests { assert!(get_deadline_monitor_result.is_some()); let get_heartbeat_monitor_result = health_monitor.get_heartbeat_monitor(heartbeat_monitor_tag); assert!(get_heartbeat_monitor_result.is_some()); + let get_logic_monitor_result = health_monitor.get_logic_monitor(logic_monitor_tag); + assert!(get_logic_monitor_result.is_some()); // Try to start again, this time should be successful. let start_result = health_monitor.start(); diff --git a/src/health_monitoring_lib/rust/logic/logic_monitor.rs b/src/health_monitoring_lib/rust/logic/logic_monitor.rs new file mode 100644 index 00000000..5ac77c1e --- /dev/null +++ b/src/health_monitoring_lib/rust/logic/logic_monitor.rs @@ -0,0 +1,573 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use crate::common::{Monitor, MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator}; +use crate::log::{error, warn, ScoreDebug}; +use crate::protected_memory::ProtectedMemoryAllocator; +use crate::tag::{MonitorTag, StateTag}; +use crate::HealthMonitorError; +use core::hash::{Hash, Hasher}; +use core::sync::atomic::{AtomicU64, AtomicU8, Ordering}; +use std::hash::DefaultHasher; +use std::sync::Arc; +use std::time::Instant; + +/// Hashed representation of state. +#[derive(PartialEq, Eq)] +struct HashedState(u64); + +impl From for HashedState { + fn from(value: u64) -> Self { + Self(value) + } +} + +impl From for u64 { + fn from(value: HashedState) -> Self { + value.0 + } +} + +impl From for HashedState { + fn from(value: StateTag) -> Self { + Self::from(&value) + } +} + +impl From<&StateTag> for HashedState { + fn from(value: &StateTag) -> Self { + let mut hasher = DefaultHasher::new(); + value.hash(&mut hasher); + Self(hasher.finish()) + } +} + +/// Internal OK state representation. +const OK_STATE: u8 = 0; + +/// Logic evaluation errors. +#[repr(u8)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, ScoreDebug)] +pub enum LogicEvaluationError { + /// State is unknown or cannot be determined. + InvalidState = OK_STATE + 1, + /// Transition is invalid. + InvalidTransition, +} + +impl From for u8 { + fn from(value: LogicEvaluationError) -> Self { + value as u8 + } +} + +impl From for LogicEvaluationError { + fn from(value: u8) -> Self { + match value { + value if value == LogicEvaluationError::InvalidState as u8 => LogicEvaluationError::InvalidState, + value if value == LogicEvaluationError::InvalidTransition as u8 => LogicEvaluationError::InvalidTransition, + _ => panic!("Invalid underlying value of logic evaluation error."), + } + } +} + +/// Builder for [`LogicMonitor`]. +#[derive(Debug)] +pub struct LogicMonitorBuilder { + /// Starting state. + initial_state: StateTag, + + /// List of allowed states. + allowed_states: Vec, + + /// List of allowed transitions between states. + allowed_transitions: Vec<(StateTag, StateTag)>, +} + +impl LogicMonitorBuilder { + /// Create a new [`LogicMonitorBuilder`]. + /// + /// - `initial_state` - starting point, implicitly added to the list of allowed states. + pub fn new(initial_state: StateTag) -> Self { + let allowed_states = vec![initial_state]; + Self { + initial_state, + allowed_states, + allowed_transitions: Vec::new(), + } + } + + /// Add allowed state. + pub fn add_state(mut self, state: StateTag) -> Self { + self.add_state_internal(state); + self + } + + /// Add allowed transition. + pub fn add_transition(mut self, transition: (StateTag, StateTag)) -> Self { + self.add_transition_internal(transition); + self + } + + /// Build the [`LogicMonitor`]. + /// + /// - `monitor_tag` - tag of this monitor. + /// - `_allocator` - protected memory allocator. + pub(crate) fn build( + self, + monitor_tag: MonitorTag, + _allocator: &ProtectedMemoryAllocator, + ) -> Result { + // Check number of transitions. + if self.allowed_transitions.is_empty() { + error!("No transitions have been added. LogicMonitor cannot be created."); + return Err(HealthMonitorError::WrongState); + } + + // Check transitions are between allowed states. + for (from, to) in self.allowed_transitions.iter() { + if !self.allowed_states.contains(from) { + error!("Invalid transition definition - 'from' state is unknown: {:?}", from); + return Err(HealthMonitorError::InvalidArgument); + } + if !self.allowed_states.contains(to) { + error!("Invalid transition definition - 'to' state is unknown: {:?}", to); + return Err(HealthMonitorError::InvalidArgument); + } + } + + let inner = Arc::new(LogicMonitorInner::new( + monitor_tag, + self.initial_state, + self.allowed_states, + self.allowed_transitions, + )); + Ok(LogicMonitor::new(inner)) + } + + // FFI internals. + + pub(crate) fn add_state_internal(&mut self, state: StateTag) { + if !self.allowed_states.contains(&state) { + self.allowed_states.push(state); + } + } + + pub(crate) fn add_transition_internal(&mut self, transition: (StateTag, StateTag)) { + if !self.allowed_transitions.contains(&transition) { + self.allowed_transitions.push(transition); + } + } +} + +/// Logic monitor. +pub struct LogicMonitor { + inner: Arc, +} + +impl LogicMonitor { + /// Create a new [`LogicMonitor`] instance. + fn new(inner: Arc) -> Self { + Self { inner } + } + + /// Perform transition to a new state. + pub fn transition(&self, state: StateTag) -> Result { + self.inner.transition(state) + } + + /// Current monitor state. + pub fn state(&self) -> Result { + self.inner.state() + } +} + +impl Monitor for LogicMonitor { + fn get_eval_handle(&self) -> MonitorEvalHandle { + MonitorEvalHandle::new(Arc::clone(&self.inner)) + } +} + +struct LogicMonitorInner { + /// Tag of this monitor. + monitor_tag: MonitorTag, + + /// Hashed current state. + current_state: AtomicU64, + + /// State of the monitor. + /// Contains zero for correct state. + /// Contains [`LogicEvaluationError`] if in erroneous state. + monitor_state: AtomicU8, + + /// List of allowed states. + allowed_states: Vec, + + /// List of allowed transitions between states. + allowed_transitions: Vec<(StateTag, StateTag)>, +} + +impl MonitorEvaluator for LogicMonitorInner { + fn evaluate(&self, _hmon_starting_point: Instant, on_error: &mut dyn FnMut(&MonitorTag, MonitorEvaluationError)) { + let monitor_state = self.monitor_state.load(Ordering::Relaxed); + if monitor_state != OK_STATE { + let error = LogicEvaluationError::from(monitor_state); + warn!("Invalid logic monitor state observed: {:?}", error); + on_error(&self.monitor_tag, error.into()); + } + } +} + +impl LogicMonitorInner { + fn new( + monitor_tag: MonitorTag, + initial_state: StateTag, + allowed_states: Vec, + allowed_transitions: Vec<(StateTag, StateTag)>, + ) -> Self { + let current_state = AtomicU64::new(HashedState::from(initial_state).into()); + let monitor_state = AtomicU8::new(0); + LogicMonitorInner { + monitor_tag, + current_state, + monitor_state, + allowed_states, + allowed_transitions, + } + } + + fn transition(&self, new_state: StateTag) -> Result { + // Get current state. + let current_state = self.state()?; + + // Check new state is valid. + if !self.allowed_states.contains(&new_state) { + // Move to `InvalidState` if requested state is not known. + warn!("Requested state transition to unknown state: {:?}", new_state); + let new_monitor_state = LogicEvaluationError::InvalidState; + self.monitor_state.store(new_monitor_state.into(), Ordering::Relaxed); + return Err(new_monitor_state); + } + + // Check transition is valid. + let transition = (current_state, new_state); + if !self.allowed_transitions.contains(&transition) { + // Move to `InvalidTransition` if requested transition is not known. + warn!( + "Requested state transition is invalid: {:?} -> {:?}", + current_state, new_state + ); + let new_monitor_state = LogicEvaluationError::InvalidTransition; + self.monitor_state.store(new_monitor_state.into(), Ordering::Relaxed); + return Err(new_monitor_state); + } + + // Change state and return it. + let hashed_new_state = HashedState::from(new_state); + self.current_state.store(hashed_new_state.into(), Ordering::Relaxed); + + Ok(new_state) + } + + fn state(&self) -> Result { + // Current state cannot be determined. + if self.monitor_state.load(Ordering::Relaxed) != OK_STATE { + warn!("Current logic monitor state cannot be determined"); + return Err(LogicEvaluationError::InvalidState); + } + + // Find current state. + let hashed_state = HashedState::from(self.current_state.load(Ordering::Relaxed)); + let result = self + .allowed_states + .iter() + .find(|e| HashedState::from(*e) == hashed_state); + + // Return current state if found. + // `None` indicates logic error - it should not be possible to successfully change state into an unknown. + match result { + Some(state) => Ok(*state), + None => Err(LogicEvaluationError::InvalidState), + } + } +} + +#[score_testing_macros::test_mod_with_log] +#[cfg(all(test, not(loom)))] +mod tests { + use crate::common::MonitorEvaluator; + use crate::logic::{LogicEvaluationError, LogicMonitorBuilder}; + use crate::protected_memory::ProtectedMemoryAllocator; + use crate::tag::{MonitorTag, StateTag}; + use crate::HealthMonitorError; + use std::time::Instant; + + #[test] + fn logic_monitor_builder_new_succeeds() { + let from_state = StateTag::from("from"); + let builder = LogicMonitorBuilder::new(from_state); + assert_eq!(builder.initial_state, from_state); + assert_eq!(builder.allowed_states, vec![from_state]); + assert!(builder.allowed_transitions.is_empty()); + } + + #[test] + fn logic_monitor_builder_build_succeeds() { + let allocator = ProtectedMemoryAllocator {}; + let monitor_tag = MonitorTag::from("logic_monitor"); + let from_state = StateTag::from("from"); + let to_state = StateTag::from("to"); + let result = LogicMonitorBuilder::new(from_state) + .add_state(to_state) + .add_transition((from_state, to_state)) + .build(monitor_tag, &allocator); + assert!(result.is_ok()); + } + + #[test] + fn logic_monitor_builder_build_no_transitions() { + let allocator = ProtectedMemoryAllocator {}; + let monitor_tag = MonitorTag::from("logic_monitor"); + let from_state = StateTag::from("from"); + let result = LogicMonitorBuilder::new(from_state).build(monitor_tag, &allocator); + assert!(result.is_err_and(|e| e == HealthMonitorError::WrongState)); + } + + #[test] + fn logic_monitor_builder_build_unknown_nodes() { + let allocator = ProtectedMemoryAllocator {}; + let monitor_tag = MonitorTag::from("logic_monitor"); + let from_state = StateTag::from("from"); + let to_state = StateTag::from("to"); + + // Unknown "from". + let result = LogicMonitorBuilder::new(from_state) + .add_state(to_state) + .add_transition((StateTag::from("unknown"), to_state)) + .build(monitor_tag, &allocator); + assert!(result.is_err_and(|e| e == HealthMonitorError::InvalidArgument)); + + // Unknown "to". + let result = LogicMonitorBuilder::new(from_state) + .add_state(to_state) + .add_transition((from_state, StateTag::from("unknown"))) + .build(monitor_tag, &allocator); + assert!(result.is_err_and(|e| e == HealthMonitorError::InvalidArgument)); + } + + #[test] + fn logic_monitor_transition_succeeds() { + let allocator = ProtectedMemoryAllocator {}; + let monitor_tag = MonitorTag::from("logic_monitor"); + let from_state = StateTag::from("from"); + let to_state = StateTag::from("to"); + let monitor = LogicMonitorBuilder::new(from_state) + .add_state(to_state) + .add_transition((from_state, to_state)) + .build(monitor_tag, &allocator) + .unwrap(); + + let result = monitor.transition(to_state); + assert!(result.is_ok_and(|s| s == to_state)); + } + + #[test] + fn logic_monitor_transition_unknown_node() { + let allocator = ProtectedMemoryAllocator {}; + let monitor_tag = MonitorTag::from("logic_monitor"); + let from_state = StateTag::from("from"); + let to_state = StateTag::from("to"); + let monitor = LogicMonitorBuilder::new(from_state) + .add_state(to_state) + .add_transition((from_state, to_state)) + .build(monitor_tag, &allocator) + .unwrap(); + + let result = monitor.transition(StateTag::from("unknown")); + assert!(result.is_err_and(|e| e == LogicEvaluationError::InvalidState)); + } + + #[test] + fn logic_monitor_transition_indeterminate_current_state() { + let allocator = ProtectedMemoryAllocator {}; + let monitor_tag = MonitorTag::from("logic_monitor"); + let from_state = StateTag::from("from"); + let to_state = StateTag::from("to"); + let monitor = LogicMonitorBuilder::new(from_state) + .add_state(to_state) + .add_transition((from_state, to_state)) + .build(monitor_tag, &allocator) + .unwrap(); + + // Trying to transition into unknown state causes monitor to move into indeterminate state. + let _ = monitor.transition(StateTag::from("unknown")); + + // Try to move to known state. + let result = monitor.transition(to_state); + assert!(result.is_err_and(|e| e == LogicEvaluationError::InvalidState)); + } + + #[test] + fn logic_monitor_transition_invalid_transition() { + let allocator = ProtectedMemoryAllocator {}; + let monitor_tag = MonitorTag::from("logic_monitor"); + let state1 = StateTag::from("state1"); + let state2: StateTag = StateTag::from("state2"); + let state3 = StateTag::from("state3"); + let monitor = LogicMonitorBuilder::new(state1) + .add_state(state2) + .add_state(state3) + .add_transition((state1, state2)) + .add_transition((state2, state3)) + .build(monitor_tag, &allocator) + .unwrap(); + + let result = monitor.transition(state3); + assert!(result.is_err_and(|e| e == LogicEvaluationError::InvalidTransition)); + } + + #[test] + fn logic_monitor_state_succeeds() { + let allocator = ProtectedMemoryAllocator {}; + let monitor_tag = MonitorTag::from("logic_monitor"); + let state1 = StateTag::from("state1"); + let state2: StateTag = StateTag::from("state2"); + let state3 = StateTag::from("state3"); + let monitor = LogicMonitorBuilder::new(state1) + .add_state(state2) + .add_state(state3) + .add_transition((state1, state2)) + .add_transition((state2, state3)) + .build(monitor_tag, &allocator) + .unwrap(); + + // Check state, perform transition to the next one. + let result = monitor.state(); + assert!(result.is_ok_and(|s| s == state1)); + + let _ = monitor.transition(state2); + let result = monitor.state(); + assert!(result.is_ok_and(|s| s == state2)); + + let _ = monitor.transition(state3); + let result = monitor.state(); + assert!(result.is_ok_and(|s| s == state3)); + } + + #[test] + fn logic_monitor_state_indeterminate_current_state() { + let allocator = ProtectedMemoryAllocator {}; + let monitor_tag = MonitorTag::from("logic_monitor"); + let from_state = StateTag::from("from"); + let to_state = StateTag::from("to"); + let monitor = LogicMonitorBuilder::new(from_state) + .add_state(to_state) + .add_transition((from_state, to_state)) + .build(monitor_tag, &allocator) + .unwrap(); + + // Trying to transition into unknown state causes monitor to move into indeterminate state. + let _ = monitor.transition(StateTag::from("unknown")); + + // Try to check state. + let result = monitor.state(); + assert!(result.is_err_and(|e| e == LogicEvaluationError::InvalidState)); + } + + #[test] + fn logic_monitor_evaluate_succeeds() { + let allocator = ProtectedMemoryAllocator {}; + let monitor_tag = MonitorTag::from("logic_monitor"); + let from_state = StateTag::from("from"); + let to_state = StateTag::from("to"); + let monitor = LogicMonitorBuilder::new(from_state) + .add_state(to_state) + .add_transition((from_state, to_state)) + .build(monitor_tag, &allocator) + .unwrap(); + let hmon_starting_point = Instant::now(); + + monitor.inner.evaluate(hmon_starting_point, &mut |monitor_tag, error| { + panic!("error happened, tag: {monitor_tag:?}, error: {error:?}") + }); + + let _ = monitor.transition(to_state); + + monitor.inner.evaluate(hmon_starting_point, &mut |monitor_tag, error| { + panic!("error happened, tag: {monitor_tag:?}, error: {error:?}") + }); + } + + #[test] + fn logic_monitor_evaluate_invalid_state() { + let allocator = ProtectedMemoryAllocator {}; + let monitor_tag = MonitorTag::from("logic_monitor"); + let from_state = StateTag::from("from"); + let to_state = StateTag::from("to"); + let monitor = LogicMonitorBuilder::new(from_state) + .add_state(to_state) + .add_transition((from_state, to_state)) + .build(monitor_tag, &allocator) + .unwrap(); + let hmon_starting_point = Instant::now(); + + monitor.inner.evaluate(hmon_starting_point, &mut |monitor_tag, error| { + panic!("error happened, tag: {monitor_tag:?}, error: {error:?}") + }); + + let _ = monitor.transition(StateTag::from("unknown")); + + let mut error_happened = false; + monitor + .inner + .evaluate(hmon_starting_point, &mut |monitor_tag_internal, error| { + error_happened = true; + assert_eq!(*monitor_tag_internal, monitor_tag); + assert_eq!(error, LogicEvaluationError::InvalidState.into()) + }); + assert!(error_happened); + } + + #[test] + fn logic_monitor_evaluate_invalid_transition() { + let allocator = ProtectedMemoryAllocator {}; + let monitor_tag = MonitorTag::from("logic_monitor"); + let state1 = StateTag::from("state1"); + let state2: StateTag = StateTag::from("state2"); + let state3 = StateTag::from("state3"); + let monitor = LogicMonitorBuilder::new(state1) + .add_state(state2) + .add_state(state3) + .add_transition((state1, state2)) + .add_transition((state2, state3)) + .build(monitor_tag, &allocator) + .unwrap(); + let hmon_starting_point = Instant::now(); + + monitor.inner.evaluate(hmon_starting_point, &mut |monitor_tag, error| { + panic!("error happened, tag: {monitor_tag:?}, error: {error:?}") + }); + + let _ = monitor.transition(state3); + + let mut error_happened = false; + monitor + .inner + .evaluate(hmon_starting_point, &mut |monitor_tag_internal, error| { + error_happened = true; + assert_eq!(*monitor_tag_internal, monitor_tag); + assert_eq!(error, LogicEvaluationError::InvalidTransition.into()) + }); + assert!(error_happened); + } +} diff --git a/src/health_monitoring_lib/rust/logic/mod.rs b/src/health_monitoring_lib/rust/logic/mod.rs new file mode 100644 index 00000000..4323bcf7 --- /dev/null +++ b/src/health_monitoring_lib/rust/logic/mod.rs @@ -0,0 +1,16 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +mod logic_monitor; + +pub use logic_monitor::{LogicEvaluationError, LogicMonitor, LogicMonitorBuilder}; diff --git a/src/health_monitoring_lib/rust/tag.rs b/src/health_monitoring_lib/rust/tag.rs index c0fa4224..c1fb2504 100644 --- a/src/health_monitoring_lib/rust/tag.rs +++ b/src/health_monitoring_lib/rust/tag.rs @@ -172,10 +172,45 @@ impl From<&str> for DeadlineTag { } } +/// State tag. +#[derive(Clone, Copy, Eq, Hash, PartialEq)] +#[repr(C)] +pub struct StateTag(Tag); + +impl fmt::Debug for StateTag { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // SAFETY: the underlying data was created from a valid `&str`. + let bytes = unsafe { core::slice::from_raw_parts(self.0.data, self.0.length) }; + let s = unsafe { core::str::from_utf8_unchecked(bytes) }; + write!(f, "StateTag({})", s) + } +} + +impl log::ScoreDebug for StateTag { + fn fmt(&self, f: log::Writer, _spec: &log::FormatSpec) -> Result<(), log::Error> { + // SAFETY: the underlying data was created from a valid `&str`. + let bytes = unsafe { core::slice::from_raw_parts(self.0.data, self.0.length) }; + let s = unsafe { core::str::from_utf8_unchecked(bytes) }; + log::score_write!(f, "StateTag({})", s) + } +} + +impl From for StateTag { + fn from(value: String) -> Self { + Self(Tag::from(value)) + } +} + +impl From<&str> for StateTag { + fn from(value: &str) -> Self { + Self(Tag::from(value)) + } +} + #[cfg(all(test, not(loom)))] mod tests { use crate::log::score_write; - use crate::tag::{DeadlineTag, MonitorTag, Tag}; + use crate::tag::{DeadlineTag, MonitorTag, StateTag, Tag}; use core::fmt::Write; use core::hash::{Hash, Hasher}; use score_log::fmt::{Error, FormatSpec, Result as FmtResult, ScoreWrite}; @@ -413,4 +448,34 @@ mod tests { let tag = DeadlineTag::from(example_str); compare_tag(tag.0, example_str); } + + #[test] + fn state_tag_debug() { + let example_str = "EXAMPLE"; + let tag = StateTag::from(example_str.to_string()); + assert_eq!(format!("{:?}", tag), "StateTag(EXAMPLE)"); + } + + #[test] + fn state_tag_score_debug() { + let example_str = "EXAMPLE"; + let tag = StateTag::from(example_str.to_string()); + let mut writer = StringWriter::new(); + assert!(score_write!(&mut writer, "{:?}", tag).is_ok()); + assert_eq!(writer.get(), "StateTag(EXAMPLE)"); + } + + #[test] + fn state_tag_from_string() { + let example_str = "EXAMPLE"; + let tag = StateTag::from(example_str.to_string()); + compare_tag(tag.0, example_str); + } + + #[test] + fn state_tag_from_str() { + let example_str = "EXAMPLE"; + let tag = StateTag::from(example_str); + compare_tag(tag.0, example_str); + } } diff --git a/src/health_monitoring_lib/rust/worker.rs b/src/health_monitoring_lib/rust/worker.rs index d5f82c3c..1dcc67a9 100644 --- a/src/health_monitoring_lib/rust/worker.rs +++ b/src/health_monitoring_lib/rust/worker.rs @@ -65,7 +65,12 @@ impl MonitoringLogic { monitor_tag, heartbeat_evaluation_error ) }, - MonitorEvaluationError::Logic => unimplemented!(), + MonitorEvaluationError::Logic(logic_evaluation_error) => { + warn!( + "Logic monitor with tag {:?} reported error: {:?}.", + monitor_tag, logic_evaluation_error + ) + }, } }); } From 48bc6f2cf4b4d10a8a95e2d03191951285b48ec0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arkadiusz=20J=C4=99drzejewski?= Date: Thu, 5 Mar 2026 15:01:56 +0100 Subject: [PATCH 2/3] hmon: reworked logic monitor - Add const constructor to `StateTag`. - Atomic `LogicState` containing current state index and monitor status. - Updated logic monitor API. - Reworked internal of logic monitor. --- src/health_monitoring_lib/rust/lib.rs | 4 +- .../rust/logic/logic_monitor.rs | 356 +++++++++--------- .../rust/logic/logic_state.rs | 225 +++++++++++ src/health_monitoring_lib/rust/logic/mod.rs | 1 + src/health_monitoring_lib/rust/tag.rs | 13 + 5 files changed, 412 insertions(+), 187 deletions(-) create mode 100644 src/health_monitoring_lib/rust/logic/logic_state.rs diff --git a/src/health_monitoring_lib/rust/lib.rs b/src/health_monitoring_lib/rust/lib.rs index 4f0f038f..ce8a56ca 100644 --- a/src/health_monitoring_lib/rust/lib.rs +++ b/src/health_monitoring_lib/rust/lib.rs @@ -372,8 +372,8 @@ mod tests { let state1 = StateTag::from("state1"); let state2 = StateTag::from("state2"); LogicMonitorBuilder::new(state1) - .add_state(state2) - .add_transition((state1, state2)) + .add_state(state1, &[state2]) + .add_state(state2, &[state1]) } #[test] diff --git a/src/health_monitoring_lib/rust/logic/logic_monitor.rs b/src/health_monitoring_lib/rust/logic/logic_monitor.rs index 5ac77c1e..af5e3924 100644 --- a/src/health_monitoring_lib/rust/logic/logic_monitor.rs +++ b/src/health_monitoring_lib/rust/logic/logic_monitor.rs @@ -13,45 +13,15 @@ use crate::common::{Monitor, MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator}; use crate::log::{error, warn, ScoreDebug}; +use crate::logic::logic_state::LogicState; use crate::protected_memory::ProtectedMemoryAllocator; use crate::tag::{MonitorTag, StateTag}; use crate::HealthMonitorError; -use core::hash::{Hash, Hasher}; -use core::sync::atomic::{AtomicU64, AtomicU8, Ordering}; -use std::hash::DefaultHasher; +use core::hash::Hash; +use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; -/// Hashed representation of state. -#[derive(PartialEq, Eq)] -struct HashedState(u64); - -impl From for HashedState { - fn from(value: u64) -> Self { - Self(value) - } -} - -impl From for u64 { - fn from(value: HashedState) -> Self { - value.0 - } -} - -impl From for HashedState { - fn from(value: StateTag) -> Self { - Self::from(&value) - } -} - -impl From<&StateTag> for HashedState { - fn from(value: &StateTag) -> Self { - let mut hasher = DefaultHasher::new(); - value.hash(&mut hasher); - Self(hasher.finish()) - } -} - /// Internal OK state representation. const OK_STATE: u8 = 0; @@ -81,41 +51,38 @@ impl From for LogicEvaluationError { } } +/// Node containing state data. +struct StateNode { + tag: StateTag, + allowed_targets: Vec, +} + /// Builder for [`LogicMonitor`]. #[derive(Debug)] pub struct LogicMonitorBuilder { /// Starting state. initial_state: StateTag, - /// List of allowed states. - allowed_states: Vec, - - /// List of allowed transitions between states. - allowed_transitions: Vec<(StateTag, StateTag)>, + /// State graph. + /// Contains state as a key and allowed transition targets as value. + state_graph: HashMap>, } impl LogicMonitorBuilder { /// Create a new [`LogicMonitorBuilder`]. /// - /// - `initial_state` - starting point, implicitly added to the list of allowed states. + /// - `initial_state` - starting point. pub fn new(initial_state: StateTag) -> Self { - let allowed_states = vec![initial_state]; Self { initial_state, - allowed_states, - allowed_transitions: Vec::new(), + state_graph: HashMap::new(), } } - /// Add allowed state. - pub fn add_state(mut self, state: StateTag) -> Self { - self.add_state_internal(state); - self - } - - /// Add allowed transition. - pub fn add_transition(mut self, transition: (StateTag, StateTag)) -> Self { - self.add_transition_internal(transition); + /// Add state along with allowed transitions. + /// If state already exist - it is overwritten. + pub fn add_state(mut self, state: StateTag, allowed_targets: &[StateTag]) -> Self { + self.add_state_internal(state, allowed_targets); self } @@ -128,45 +95,62 @@ impl LogicMonitorBuilder { monitor_tag: MonitorTag, _allocator: &ProtectedMemoryAllocator, ) -> Result { - // Check number of transitions. - if self.allowed_transitions.is_empty() { - error!("No transitions have been added. LogicMonitor cannot be created."); + // Check number of states. + if self.state_graph.is_empty() { + error!("No states have been added. LogicMonitor cannot be created."); return Err(HealthMonitorError::WrongState); } - // Check transitions are between allowed states. - for (from, to) in self.allowed_transitions.iter() { - if !self.allowed_states.contains(from) { - error!("Invalid transition definition - 'from' state is unknown: {:?}", from); - return Err(HealthMonitorError::InvalidArgument); + // Check transitions are between defined states. + for (state, allowed_targets) in self.state_graph.iter() { + for allowed_target in allowed_targets.iter() { + if !self.state_graph.contains_key(allowed_target) { + error!( + "Undefined target state. Origin: {:?}, target: {:?}", + state, allowed_target + ); + return Err(HealthMonitorError::InvalidArgument); + } } - if !self.allowed_states.contains(to) { - error!("Invalid transition definition - 'to' state is unknown: {:?}", to); - return Err(HealthMonitorError::InvalidArgument); + } + + // Convert builder-internal representation into monitor-internal representation. + let mut state_graph_vec = Vec::new(); + for (state, allowed_targets) in self.state_graph.into_iter() { + state_graph_vec.push(StateNode { + tag: state, + allowed_targets, + }); + } + + // Check initial state is defined, determine initial state index. + let mut initial_state_index_option = None; + for (index, node) in state_graph_vec.iter().enumerate() { + if node.tag == self.initial_state { + initial_state_index_option = Some(index); } } + let initial_state_index = match initial_state_index_option { + Some(index) => index, + None => { + error!("Undefined requested initial state: {:?}", self.initial_state); + return Err(HealthMonitorError::InvalidArgument); + }, + }; + let inner = Arc::new(LogicMonitorInner::new( monitor_tag, - self.initial_state, - self.allowed_states, - self.allowed_transitions, + initial_state_index, + state_graph_vec, )); Ok(LogicMonitor::new(inner)) } // FFI internals. - pub(crate) fn add_state_internal(&mut self, state: StateTag) { - if !self.allowed_states.contains(&state) { - self.allowed_states.push(state); - } - } - - pub(crate) fn add_transition_internal(&mut self, transition: (StateTag, StateTag)) { - if !self.allowed_transitions.contains(&transition) { - self.allowed_transitions.push(transition); - } + pub(crate) fn add_state_internal(&mut self, state: StateTag, allowed_targets: &[StateTag]) { + self.state_graph.insert(state, allowed_targets.to_vec()); } } @@ -182,6 +166,7 @@ impl LogicMonitor { } /// Perform transition to a new state. + /// On success, current state is returned. pub fn transition(&self, state: StateTag) -> Result { self.inner.transition(state) } @@ -202,103 +187,104 @@ struct LogicMonitorInner { /// Tag of this monitor. monitor_tag: MonitorTag, - /// Hashed current state. - current_state: AtomicU64, - - /// State of the monitor. - /// Contains zero for correct state. - /// Contains [`LogicEvaluationError`] if in erroneous state. - monitor_state: AtomicU8, + /// Current logic state. + logic_state: LogicState, - /// List of allowed states. - allowed_states: Vec, - - /// List of allowed transitions between states. - allowed_transitions: Vec<(StateTag, StateTag)>, + /// State graph. + /// Contains state and allowed targets. + state_graph: Vec, } impl MonitorEvaluator for LogicMonitorInner { fn evaluate(&self, _hmon_starting_point: Instant, on_error: &mut dyn FnMut(&MonitorTag, MonitorEvaluationError)) { - let monitor_state = self.monitor_state.load(Ordering::Relaxed); - if monitor_state != OK_STATE { - let error = LogicEvaluationError::from(monitor_state); - warn!("Invalid logic monitor state observed: {:?}", error); + let snapshot = self.logic_state.snapshot(); + if snapshot.monitor_status() != OK_STATE { + let error = LogicEvaluationError::from(snapshot.monitor_status()); + warn!("Logic monitor error observed: {:?}", error); on_error(&self.monitor_tag, error.into()); } } } impl LogicMonitorInner { - fn new( - monitor_tag: MonitorTag, - initial_state: StateTag, - allowed_states: Vec, - allowed_transitions: Vec<(StateTag, StateTag)>, - ) -> Self { - let current_state = AtomicU64::new(HashedState::from(initial_state).into()); - let monitor_state = AtomicU8::new(0); + fn new(monitor_tag: MonitorTag, initial_state_index: usize, state_graph: Vec) -> Self { + let logic_state = LogicState::new(initial_state_index); LogicMonitorInner { monitor_tag, - current_state, - monitor_state, - allowed_states, - allowed_transitions, + logic_state, + state_graph, } } - fn transition(&self, new_state: StateTag) -> Result { - // Get current state. - let current_state = self.state()?; - - // Check new state is valid. - if !self.allowed_states.contains(&new_state) { - // Move to `InvalidState` if requested state is not known. - warn!("Requested state transition to unknown state: {:?}", new_state); - let new_monitor_state = LogicEvaluationError::InvalidState; - self.monitor_state.store(new_monitor_state.into(), Ordering::Relaxed); - return Err(new_monitor_state); + fn find_node_by_index(&self, state_index: usize) -> Result<&StateNode, LogicEvaluationError> { + match self.state_graph.get(state_index) { + Some(node) => Ok(node), + None => Err(LogicEvaluationError::InvalidState), } + } + + fn find_index_by_tag(&self, state_tag: StateTag) -> Result { + for (index, state_node) in self.state_graph.iter().enumerate() { + if state_node.tag == state_tag { + return Ok(index); + } + } + + Err(LogicEvaluationError::InvalidState) + } - // Check transition is valid. - let transition = (current_state, new_state); - if !self.allowed_transitions.contains(&transition) { - // Move to `InvalidTransition` if requested transition is not known. + fn transition(&self, target_state: StateTag) -> Result { + // Load current monitor state. + let snapshot = self.logic_state.snapshot(); + + // Disallow operation in erroneous state. + if snapshot.monitor_status() != OK_STATE { + warn!("Current logic monitor state cannot be determined"); + return Err(LogicEvaluationError::InvalidState); + } + + // Get name and allowed targets of current state. + let current_state_index = snapshot.current_state_index(); + let current_state_node = self.find_node_by_index(current_state_index)?; + + // Check transition to a target state is valid. + if !current_state_node.allowed_targets.contains(&target_state) { + // Move to `InvalidTransition` if requested target state is not known. warn!( "Requested state transition is invalid: {:?} -> {:?}", - current_state, new_state + current_state_node.tag, target_state ); - let new_monitor_state = LogicEvaluationError::InvalidTransition; - self.monitor_state.store(new_monitor_state.into(), Ordering::Relaxed); - return Err(new_monitor_state); + let error = LogicEvaluationError::InvalidTransition; + let _ = self.logic_state.update(|mut current_state| { + current_state.set_monitor_status(error.into()); + Some(current_state) + }); + return Err(error); } - // Change state and return it. - let hashed_new_state = HashedState::from(new_state); - self.current_state.store(hashed_new_state.into(), Ordering::Relaxed); + // Find index of target state, then change current state. + let target_state_index = self.find_index_by_tag(target_state)?; + let _ = self.logic_state.update(|mut current_state| { + current_state.set_current_state_index(target_state_index); + Some(current_state) + }); - Ok(new_state) + Ok(target_state) } fn state(&self) -> Result { - // Current state cannot be determined. - if self.monitor_state.load(Ordering::Relaxed) != OK_STATE { + // Load current monitor state. + let snapshot = self.logic_state.snapshot(); + + // Disallow operation in erroneous state. + if snapshot.monitor_status() != OK_STATE { warn!("Current logic monitor state cannot be determined"); return Err(LogicEvaluationError::InvalidState); } // Find current state. - let hashed_state = HashedState::from(self.current_state.load(Ordering::Relaxed)); - let result = self - .allowed_states - .iter() - .find(|e| HashedState::from(*e) == hashed_state); - - // Return current state if found. - // `None` indicates logic error - it should not be possible to successfully change state into an unknown. - match result { - Some(state) => Ok(*state), - None => Err(LogicEvaluationError::InvalidState), - } + self.find_node_by_index(snapshot.current_state_index()) + .map(|node| node.tag) } } @@ -314,11 +300,10 @@ mod tests { #[test] fn logic_monitor_builder_new_succeeds() { - let from_state = StateTag::from("from"); - let builder = LogicMonitorBuilder::new(from_state); - assert_eq!(builder.initial_state, from_state); - assert_eq!(builder.allowed_states, vec![from_state]); - assert!(builder.allowed_transitions.is_empty()); + let initial_state = StateTag::from("initial"); + let builder = LogicMonitorBuilder::new(initial_state); + assert_eq!(builder.initial_state, initial_state); + assert!(builder.state_graph.is_empty()) } #[test] @@ -328,39 +313,43 @@ mod tests { let from_state = StateTag::from("from"); let to_state = StateTag::from("to"); let result = LogicMonitorBuilder::new(from_state) - .add_state(to_state) - .add_transition((from_state, to_state)) + .add_state(from_state, &[to_state]) + .add_state(to_state, &[]) .build(monitor_tag, &allocator); assert!(result.is_ok()); } #[test] - fn logic_monitor_builder_build_no_transitions() { + fn logic_monitor_builder_build_no_states() { let allocator = ProtectedMemoryAllocator {}; let monitor_tag = MonitorTag::from("logic_monitor"); - let from_state = StateTag::from("from"); - let result = LogicMonitorBuilder::new(from_state).build(monitor_tag, &allocator); + let initial_state = StateTag::from("initial"); + let result = LogicMonitorBuilder::new(initial_state).build(monitor_tag, &allocator); assert!(result.is_err_and(|e| e == HealthMonitorError::WrongState)); } #[test] - fn logic_monitor_builder_build_unknown_nodes() { + fn logic_monitor_builder_build_undefined_target() { let allocator = ProtectedMemoryAllocator {}; let monitor_tag = MonitorTag::from("logic_monitor"); let from_state = StateTag::from("from"); let to_state = StateTag::from("to"); - - // Unknown "from". let result = LogicMonitorBuilder::new(from_state) - .add_state(to_state) - .add_transition((StateTag::from("unknown"), to_state)) + .add_state(from_state, &[to_state]) .build(monitor_tag, &allocator); assert!(result.is_err_and(|e| e == HealthMonitorError::InvalidArgument)); + } - // Unknown "to". - let result = LogicMonitorBuilder::new(from_state) - .add_state(to_state) - .add_transition((from_state, StateTag::from("unknown"))) + #[test] + fn logic_monitor_builder_build_undefined_initial_state() { + let allocator = ProtectedMemoryAllocator {}; + let monitor_tag = MonitorTag::from("logic_monitor"); + let initial_state = StateTag::from("initial"); + let from_state = StateTag::from("from"); + let to_state = StateTag::from("to"); + let result = LogicMonitorBuilder::new(initial_state) + .add_state(from_state, &[to_state]) + .add_state(to_state, &[]) .build(monitor_tag, &allocator); assert!(result.is_err_and(|e| e == HealthMonitorError::InvalidArgument)); } @@ -372,8 +361,8 @@ mod tests { let from_state = StateTag::from("from"); let to_state = StateTag::from("to"); let monitor = LogicMonitorBuilder::new(from_state) - .add_state(to_state) - .add_transition((from_state, to_state)) + .add_state(from_state, &[to_state]) + .add_state(to_state, &[]) .build(monitor_tag, &allocator) .unwrap(); @@ -388,13 +377,13 @@ mod tests { let from_state = StateTag::from("from"); let to_state = StateTag::from("to"); let monitor = LogicMonitorBuilder::new(from_state) - .add_state(to_state) - .add_transition((from_state, to_state)) + .add_state(from_state, &[to_state]) + .add_state(to_state, &[]) .build(monitor_tag, &allocator) .unwrap(); let result = monitor.transition(StateTag::from("unknown")); - assert!(result.is_err_and(|e| e == LogicEvaluationError::InvalidState)); + assert!(result.is_err_and(|e| e == LogicEvaluationError::InvalidTransition)); } #[test] @@ -404,8 +393,8 @@ mod tests { let from_state = StateTag::from("from"); let to_state = StateTag::from("to"); let monitor = LogicMonitorBuilder::new(from_state) - .add_state(to_state) - .add_transition((from_state, to_state)) + .add_state(from_state, &[to_state]) + .add_state(to_state, &[]) .build(monitor_tag, &allocator) .unwrap(); @@ -425,10 +414,9 @@ mod tests { let state2: StateTag = StateTag::from("state2"); let state3 = StateTag::from("state3"); let monitor = LogicMonitorBuilder::new(state1) - .add_state(state2) - .add_state(state3) - .add_transition((state1, state2)) - .add_transition((state2, state3)) + .add_state(state1, &[state2]) + .add_state(state2, &[state3]) + .add_state(state3, &[]) .build(monitor_tag, &allocator) .unwrap(); @@ -444,10 +432,9 @@ mod tests { let state2: StateTag = StateTag::from("state2"); let state3 = StateTag::from("state3"); let monitor = LogicMonitorBuilder::new(state1) - .add_state(state2) - .add_state(state3) - .add_transition((state1, state2)) - .add_transition((state2, state3)) + .add_state(state1, &[state2]) + .add_state(state2, &[state3]) + .add_state(state3, &[]) .build(monitor_tag, &allocator) .unwrap(); @@ -471,8 +458,8 @@ mod tests { let from_state = StateTag::from("from"); let to_state = StateTag::from("to"); let monitor = LogicMonitorBuilder::new(from_state) - .add_state(to_state) - .add_transition((from_state, to_state)) + .add_state(from_state, &[to_state]) + .add_state(to_state, &[]) .build(monitor_tag, &allocator) .unwrap(); @@ -491,8 +478,8 @@ mod tests { let from_state = StateTag::from("from"); let to_state = StateTag::from("to"); let monitor = LogicMonitorBuilder::new(from_state) - .add_state(to_state) - .add_transition((from_state, to_state)) + .add_state(from_state, &[to_state]) + .add_state(to_state, &[]) .build(monitor_tag, &allocator) .unwrap(); let hmon_starting_point = Instant::now(); @@ -515,8 +502,8 @@ mod tests { let from_state = StateTag::from("from"); let to_state = StateTag::from("to"); let monitor = LogicMonitorBuilder::new(from_state) - .add_state(to_state) - .add_transition((from_state, to_state)) + .add_state(from_state, &[to_state]) + .add_state(to_state, &[]) .build(monitor_tag, &allocator) .unwrap(); let hmon_starting_point = Instant::now(); @@ -533,7 +520,7 @@ mod tests { .evaluate(hmon_starting_point, &mut |monitor_tag_internal, error| { error_happened = true; assert_eq!(*monitor_tag_internal, monitor_tag); - assert_eq!(error, LogicEvaluationError::InvalidState.into()) + assert_eq!(error, LogicEvaluationError::InvalidTransition.into()) }); assert!(error_happened); } @@ -546,10 +533,9 @@ mod tests { let state2: StateTag = StateTag::from("state2"); let state3 = StateTag::from("state3"); let monitor = LogicMonitorBuilder::new(state1) - .add_state(state2) - .add_state(state3) - .add_transition((state1, state2)) - .add_transition((state2, state3)) + .add_state(state1, &[state2]) + .add_state(state2, &[state3]) + .add_state(state3, &[]) .build(monitor_tag, &allocator) .unwrap(); let hmon_starting_point = Instant::now(); diff --git a/src/health_monitoring_lib/rust/logic/logic_state.rs b/src/health_monitoring_lib/rust/logic/logic_state.rs new file mode 100644 index 00000000..3157fb08 --- /dev/null +++ b/src/health_monitoring_lib/rust/logic/logic_state.rs @@ -0,0 +1,225 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +#[cfg(not(loom))] +use core::sync::atomic::{AtomicU64, Ordering}; +#[cfg(loom)] +use loom::sync::atomic::{AtomicU64, Ordering}; + +/// Snapshot of a logic state. +/// Layout (u64) = | current state index: 56 bits | monitor status: u8 | +#[derive(Clone, Copy)] +pub struct LogicStateSnapshot(u64); + +const INDEX_MASK: u64 = 0xFFFFFFFF_FFFFFF00; +const INDEX_OFFSET: u32 = u8::BITS; +const STATUS_MASK: u64 = 0xFF; + +impl LogicStateSnapshot { + /// Create a new snapshot. + pub fn new(initial_state_index: usize) -> Self { + let mut snapshot = Self(0); + snapshot.set_current_state_index(initial_state_index); + snapshot + } + + /// Return underlying data. + pub fn as_u64(&self) -> u64 { + self.0 + } + + /// Current state index. + pub fn current_state_index(&self) -> usize { + ((self.0 & INDEX_MASK) >> INDEX_OFFSET) as usize + } + + /// Set current state index. + /// Value is 56-bit, max accepted value is 0x00FFFFFF_FFFFFFFF. + pub fn set_current_state_index(&mut self, value: usize) { + assert!(value < 1 << 56, "provided state index is out of range"); + self.0 = ((value as u64) << INDEX_OFFSET) | (self.0 & !INDEX_MASK) + } + + /// Monitor status. + /// - zero if healthy. + /// - `LogicEvaluationError` if not. + pub fn monitor_status(&self) -> u8 { + (self.0 & STATUS_MASK) as u8 + } + + /// Set monitor status. + pub fn set_monitor_status(&mut self, value: u8) { + self.0 = (value as u64) | (self.0 & !STATUS_MASK); + } +} + +impl From for LogicStateSnapshot { + fn from(value: u64) -> Self { + Self(value) + } +} + +/// Atomic representation of [`LogicStateSnapshot`]. +pub struct LogicState(AtomicU64); + +impl LogicState { + /// Create a new [`LogicState`]. + pub fn new(initial_state_index: usize) -> Self { + let snapshot = LogicStateSnapshot::new(initial_state_index); + Self(AtomicU64::new(snapshot.as_u64())) + } + + /// Return a snapshot of the current logic state. + #[allow(dead_code)] + pub fn snapshot(&self) -> LogicStateSnapshot { + LogicStateSnapshot::from(self.0.load(Ordering::Acquire)) + } + + /// Update the logic state using the provided closure. + /// Closure receives the current state and should return an [`Option`] containing a new state. + /// If [`None`] is returned then the state was not updated. + pub fn update Option>( + &self, + mut f: F, + ) -> Result { + self.0 + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |prev| { + let snapshot = LogicStateSnapshot::from(prev); + f(snapshot).map(|new_snapshot: LogicStateSnapshot| new_snapshot.as_u64()) + }) + .map(LogicStateSnapshot::from) + .map_err(LogicStateSnapshot::from) + } +} + +#[cfg(all(test, not(loom)))] +mod tests { + use crate::logic::logic_state::{LogicState, LogicStateSnapshot}; + use core::sync::atomic::Ordering; + + #[test] + fn snapshot_new_succeeds() { + let initial_state_index = 4321; + let state = LogicStateSnapshot::new(initial_state_index); + + assert_eq!(state.as_u64(), (initial_state_index as u64) << u8::BITS); + assert_eq!(state.current_state_index(), initial_state_index); + assert_eq!(state.monitor_status(), 0); + } + + #[test] + fn snapshot_from_u64_zero() { + let state = LogicStateSnapshot::from(0); + + assert_eq!(state.as_u64(), 0x00); + assert_eq!(state.current_state_index(), 0); + assert_eq!(state.monitor_status(), 0); + } + + #[test] + fn snapshot_from_u64_valid() { + let state = LogicStateSnapshot::from(0xDEADBEEF_DEADBEEF); + + assert_eq!(state.as_u64(), 0xDEADBEEF_DEADBEEF); + assert_eq!(state.current_state_index(), 0xDEADBEEF_DEADBEEF >> u8::BITS); + assert_eq!(state.monitor_status(), 0xEF); + } + + #[test] + fn snapshot_from_u64_max() { + let state = LogicStateSnapshot::from(u64::MAX); + + assert_eq!(state.as_u64(), u64::MAX); + assert_eq!(state.current_state_index(), (u64::MAX >> u8::BITS) as usize); + assert_eq!(state.monitor_status(), u8::MAX); + } + + #[test] + fn snapshot_set_current_state_index_valid() { + let mut state = LogicStateSnapshot::from(0xDEADBEEF_DEADBEEF); + state.set_current_state_index(0x00CAFEBA_DCAFEBAD); + + assert_eq!(state.current_state_index(), 0x00CAFEBA_DCAFEBAD); + + // Check other parameters unchanged. + assert_eq!(state.monitor_status(), 0xEF); + } + + #[test] + #[should_panic(expected = "provided state index is out of range")] + fn snapshot_set_heartbeat_timestamp_out_of_range() { + let mut state = LogicStateSnapshot::from(0xDEADBEEF_DEADBEEF); + state.set_current_state_index(0x01000000_00000000); + } + + #[test] + fn snapshot_set_monitor_status_valid() { + let mut state = LogicStateSnapshot::from(0xDEADBEEF_DEADBEEF); + state.set_monitor_status(0xFA); + + assert_eq!(state.monitor_status(), 0xFA); + + // Check other parameters unchanged. + assert_eq!(state.current_state_index(), 0xDEADBEEF_DEADBEEF >> u8::BITS); + } + + #[test] + fn state_new() { + let initial_state_index = 4321; + let state = LogicState::new(initial_state_index); + assert_eq!( + state.0.load(Ordering::Relaxed), + (initial_state_index as u64) << u8::BITS + ); + } + + #[test] + fn state_snapshot() { + let initial_state_index = 4321; + let state = LogicState::new(initial_state_index); + assert_eq!(state.snapshot().as_u64(), (initial_state_index as u64) << u8::BITS); + } + + #[test] + fn state_update_some() { + let state = LogicState::new(0); + let _ = state.update(|prev_snapshot| { + // Make sure state is as expected. + assert_eq!(prev_snapshot.as_u64(), 0x00); + assert_eq!(prev_snapshot.current_state_index(), 0); + assert_eq!(prev_snapshot.monitor_status(), 0); + + Some(LogicStateSnapshot::from(0xDEADBEEF_DEADBEEF)) + }); + + let _ = state.update(|prev_snapshot| { + // Make sure state is as expected. + assert_eq!(prev_snapshot.as_u64(), 0xDEADBEEF_DEADBEEF); + assert_eq!(prev_snapshot.current_state_index(), 0xDEADBEEF_DEADBEEF >> u8::BITS); + assert_eq!(prev_snapshot.monitor_status(), 0xEF); + + Some(LogicStateSnapshot::from(0)) + }); + + assert_eq!(state.snapshot().as_u64(), 0); + } + + #[test] + fn state_update_none() { + let state = LogicState::new(4321); + let _ = state.update(|_| Some(LogicStateSnapshot::from(0xDEADBEEF_DEADBEEF))); + let _ = state.update(|_| None); + + assert_eq!(state.snapshot().as_u64(), 0xDEADBEEF_DEADBEEF); + } +} diff --git a/src/health_monitoring_lib/rust/logic/mod.rs b/src/health_monitoring_lib/rust/logic/mod.rs index 4323bcf7..6242e7f6 100644 --- a/src/health_monitoring_lib/rust/logic/mod.rs +++ b/src/health_monitoring_lib/rust/logic/mod.rs @@ -12,5 +12,6 @@ // ******************************************************************************* mod logic_monitor; +mod logic_state; pub use logic_monitor::{LogicEvaluationError, LogicMonitor, LogicMonitorBuilder}; diff --git a/src/health_monitoring_lib/rust/tag.rs b/src/health_monitoring_lib/rust/tag.rs index c1fb2504..71be7f4e 100644 --- a/src/health_monitoring_lib/rust/tag.rs +++ b/src/health_monitoring_lib/rust/tag.rs @@ -177,6 +177,12 @@ impl From<&str> for DeadlineTag { #[repr(C)] pub struct StateTag(Tag); +impl StateTag { + pub const fn new(value: &str) -> Self { + StateTag(Tag::new(value)) + } +} + impl fmt::Debug for StateTag { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // SAFETY: the underlying data was created from a valid `&str`. @@ -449,6 +455,13 @@ mod tests { compare_tag(tag.0, example_str); } + #[test] + fn state_tag_new() { + const EXAMPLE_STR: &str = "EXAMPLE"; + const TAG: StateTag = StateTag::new(EXAMPLE_STR); + compare_tag(TAG.0, EXAMPLE_STR); + } + #[test] fn state_tag_debug() { let example_str = "EXAMPLE"; From fb1c8544c68e08e073f9dc5c2791aa0a5e691aca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arkadiusz=20J=C4=99drzejewski?= Date: Mon, 9 Mar 2026 12:13:39 +0100 Subject: [PATCH 3/3] hmon: logic monitor post-review fixes - Move `loom` imports to `common`. - Remove `Sync` from `LogicMonitor`. - `PhantomUnsync` added to `common`. - `LogicEvaluationError` -> replace `From` with `TryFrom`. - Replace `monitor_status` and `set_monitor_status` type. - From `u8` to `LogicEvaluationError`. - Replace `update` with `swap` in `LogicState`. --- src/health_monitoring_lib/rust/common.rs | 10 ++ .../rust/heartbeat/heartbeat_state.rs | 6 +- .../rust/logic/logic_monitor.rs | 51 +++++---- .../rust/logic/logic_state.rs | 106 ++++++++---------- 4 files changed, 86 insertions(+), 87 deletions(-) diff --git a/src/health_monitoring_lib/rust/common.rs b/src/health_monitoring_lib/rust/common.rs index eb3904e1..be1578b9 100644 --- a/src/health_monitoring_lib/rust/common.rs +++ b/src/health_monitoring_lib/rust/common.rs @@ -16,7 +16,9 @@ use crate::heartbeat::HeartbeatEvaluationError; use crate::log::ScoreDebug; use crate::logic::LogicEvaluationError; use crate::tag::MonitorTag; +use core::cell::Cell; use core::hash::Hash; +use core::marker::PhantomData; use core::time::Duration; use std::sync::Arc; use std::time::Instant; @@ -151,6 +153,14 @@ where T::try_from(millis).expect("Duration is too big for the integer of this type") } +/// Marker for disabling [`Sync`]. +pub(crate) type PhantomUnsync = PhantomData>; + +#[cfg(not(loom))] +pub use core::sync::atomic::{AtomicU64, Ordering}; +#[cfg(loom)] +pub use loom::sync::atomic::{AtomicU64, Ordering}; + #[cfg(all(test, not(loom)))] mod tests { use crate::common::{duration_to_int, time_offset, TimeRange}; diff --git a/src/health_monitoring_lib/rust/heartbeat/heartbeat_state.rs b/src/health_monitoring_lib/rust/heartbeat/heartbeat_state.rs index 3bfa9013..06ae7727 100644 --- a/src/health_monitoring_lib/rust/heartbeat/heartbeat_state.rs +++ b/src/health_monitoring_lib/rust/heartbeat/heartbeat_state.rs @@ -11,13 +11,9 @@ // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* +use crate::common::{AtomicU64, Ordering}; use core::cmp::min; -#[cfg(not(loom))] -use core::sync::atomic::{AtomicU64, Ordering}; -#[cfg(loom)] -use loom::sync::atomic::{AtomicU64, Ordering}; - /// Snapshot of a heartbeat state. /// Layout (u64) = | heartbeat timestamp: 62 bits | heartbeat counter: 2 bits | #[derive(Clone, Copy, Default)] diff --git a/src/health_monitoring_lib/rust/logic/logic_monitor.rs b/src/health_monitoring_lib/rust/logic/logic_monitor.rs index af5e3924..ce82f8ec 100644 --- a/src/health_monitoring_lib/rust/logic/logic_monitor.rs +++ b/src/health_monitoring_lib/rust/logic/logic_monitor.rs @@ -11,19 +11,20 @@ // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* -use crate::common::{Monitor, MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator}; +use crate::common::{Monitor, MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator, PhantomUnsync}; use crate::log::{error, warn, ScoreDebug}; use crate::logic::logic_state::LogicState; use crate::protected_memory::ProtectedMemoryAllocator; use crate::tag::{MonitorTag, StateTag}; use crate::HealthMonitorError; use core::hash::Hash; +use core::marker::PhantomData; use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; /// Internal OK state representation. -const OK_STATE: u8 = 0; +pub(super) const OK_STATE: u8 = 0; /// Logic evaluation errors. #[repr(u8)] @@ -33,6 +34,8 @@ pub enum LogicEvaluationError { InvalidState = OK_STATE + 1, /// Transition is invalid. InvalidTransition, + /// Unknown error. + UnmappedError, } impl From for u8 { @@ -41,12 +44,18 @@ impl From for u8 { } } -impl From for LogicEvaluationError { - fn from(value: u8) -> Self { +impl TryFrom for LogicEvaluationError { + type Error = (); + + fn try_from(value: u8) -> Result { + const INVALID_STATE: u8 = LogicEvaluationError::InvalidState as u8; + const INVALID_TRANSITION: u8 = LogicEvaluationError::InvalidTransition as u8; + const UNMAPPED_ERROR: u8 = LogicEvaluationError::UnmappedError as u8; match value { - value if value == LogicEvaluationError::InvalidState as u8 => LogicEvaluationError::InvalidState, - value if value == LogicEvaluationError::InvalidTransition as u8 => LogicEvaluationError::InvalidTransition, - _ => panic!("Invalid underlying value of logic evaluation error."), + INVALID_STATE => Ok(LogicEvaluationError::InvalidState), + INVALID_TRANSITION => Ok(LogicEvaluationError::InvalidTransition), + UNMAPPED_ERROR => Ok(LogicEvaluationError::UnmappedError), + _ => Err(()), } } } @@ -157,12 +166,16 @@ impl LogicMonitorBuilder { /// Logic monitor. pub struct LogicMonitor { inner: Arc, + _unsync: PhantomUnsync, } impl LogicMonitor { /// Create a new [`LogicMonitor`] instance. fn new(inner: Arc) -> Self { - Self { inner } + Self { + inner, + _unsync: PhantomData, + } } /// Perform transition to a new state. @@ -198,8 +211,7 @@ struct LogicMonitorInner { impl MonitorEvaluator for LogicMonitorInner { fn evaluate(&self, _hmon_starting_point: Instant, on_error: &mut dyn FnMut(&MonitorTag, MonitorEvaluationError)) { let snapshot = self.logic_state.snapshot(); - if snapshot.monitor_status() != OK_STATE { - let error = LogicEvaluationError::from(snapshot.monitor_status()); + if let Err(error) = snapshot.monitor_status() { warn!("Logic monitor error observed: {:?}", error); on_error(&self.monitor_tag, error.into()); } @@ -235,10 +247,10 @@ impl LogicMonitorInner { fn transition(&self, target_state: StateTag) -> Result { // Load current monitor state. - let snapshot = self.logic_state.snapshot(); + let mut snapshot = self.logic_state.snapshot(); // Disallow operation in erroneous state. - if snapshot.monitor_status() != OK_STATE { + if snapshot.monitor_status().is_err() { warn!("Current logic monitor state cannot be determined"); return Err(LogicEvaluationError::InvalidState); } @@ -254,20 +266,17 @@ impl LogicMonitorInner { "Requested state transition is invalid: {:?} -> {:?}", current_state_node.tag, target_state ); + let error = LogicEvaluationError::InvalidTransition; - let _ = self.logic_state.update(|mut current_state| { - current_state.set_monitor_status(error.into()); - Some(current_state) - }); + snapshot.set_monitor_status(error); + let _ = self.logic_state.swap(snapshot); return Err(error); } // Find index of target state, then change current state. let target_state_index = self.find_index_by_tag(target_state)?; - let _ = self.logic_state.update(|mut current_state| { - current_state.set_current_state_index(target_state_index); - Some(current_state) - }); + snapshot.set_current_state_index(target_state_index); + let _ = self.logic_state.swap(snapshot); Ok(target_state) } @@ -277,7 +286,7 @@ impl LogicMonitorInner { let snapshot = self.logic_state.snapshot(); // Disallow operation in erroneous state. - if snapshot.monitor_status() != OK_STATE { + if snapshot.monitor_status().is_err() { warn!("Current logic monitor state cannot be determined"); return Err(LogicEvaluationError::InvalidState); } diff --git a/src/health_monitoring_lib/rust/logic/logic_state.rs b/src/health_monitoring_lib/rust/logic/logic_state.rs index 3157fb08..bef2de0c 100644 --- a/src/health_monitoring_lib/rust/logic/logic_state.rs +++ b/src/health_monitoring_lib/rust/logic/logic_state.rs @@ -11,10 +11,9 @@ // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* -#[cfg(not(loom))] -use core::sync::atomic::{AtomicU64, Ordering}; -#[cfg(loom)] -use loom::sync::atomic::{AtomicU64, Ordering}; +use crate::common::{AtomicU64, Ordering}; +use crate::logic::logic_monitor::OK_STATE; +use crate::logic::LogicEvaluationError; /// Snapshot of a logic state. /// Layout (u64) = | current state index: 56 bits | monitor status: u8 | @@ -53,12 +52,17 @@ impl LogicStateSnapshot { /// Monitor status. /// - zero if healthy. /// - `LogicEvaluationError` if not. - pub fn monitor_status(&self) -> u8 { - (self.0 & STATUS_MASK) as u8 + pub fn monitor_status(&self) -> Result<(), LogicEvaluationError> { + let value = (self.0 & STATUS_MASK) as u8; + if value == OK_STATE { + Ok(()) + } else { + Err(value.try_into().map_err(|_| LogicEvaluationError::UnmappedError)?) + } } /// Set monitor status. - pub fn set_monitor_status(&mut self, value: u8) { + pub fn set_monitor_status(&mut self, value: LogicEvaluationError) { self.0 = (value as u64) | (self.0 & !STATUS_MASK); } } @@ -85,26 +89,16 @@ impl LogicState { LogicStateSnapshot::from(self.0.load(Ordering::Acquire)) } - /// Update the logic state using the provided closure. - /// Closure receives the current state and should return an [`Option`] containing a new state. - /// If [`None`] is returned then the state was not updated. - pub fn update Option>( - &self, - mut f: F, - ) -> Result { - self.0 - .fetch_update(Ordering::AcqRel, Ordering::Acquire, |prev| { - let snapshot = LogicStateSnapshot::from(prev); - f(snapshot).map(|new_snapshot: LogicStateSnapshot| new_snapshot.as_u64()) - }) - .map(LogicStateSnapshot::from) - .map_err(LogicStateSnapshot::from) + /// Store a new [`LogicStateSnapshot`] and return the previous one. + pub fn swap(&self, new: LogicStateSnapshot) -> LogicStateSnapshot { + self.0.swap(new.as_u64(), Ordering::AcqRel).into() } } #[cfg(all(test, not(loom)))] mod tests { use crate::logic::logic_state::{LogicState, LogicStateSnapshot}; + use crate::logic::LogicEvaluationError; use core::sync::atomic::Ordering; #[test] @@ -114,7 +108,7 @@ mod tests { assert_eq!(state.as_u64(), (initial_state_index as u64) << u8::BITS); assert_eq!(state.current_state_index(), initial_state_index); - assert_eq!(state.monitor_status(), 0); + assert!(state.monitor_status().is_ok()); } #[test] @@ -123,16 +117,18 @@ mod tests { assert_eq!(state.as_u64(), 0x00); assert_eq!(state.current_state_index(), 0); - assert_eq!(state.monitor_status(), 0); + assert!(state.monitor_status().is_ok()); } #[test] fn snapshot_from_u64_valid() { - let state = LogicStateSnapshot::from(0xDEADBEEF_DEADBEEF); + let state = LogicStateSnapshot::from(0xDEADBEEF_DEADBE01); - assert_eq!(state.as_u64(), 0xDEADBEEF_DEADBEEF); - assert_eq!(state.current_state_index(), 0xDEADBEEF_DEADBEEF >> u8::BITS); - assert_eq!(state.monitor_status(), 0xEF); + assert_eq!(state.as_u64(), 0xDEADBEEF_DEADBE01); + assert_eq!(state.current_state_index(), 0xDEADBEEF_DEADBE01 >> u8::BITS); + assert!(state + .monitor_status() + .is_err_and(|e| e == LogicEvaluationError::InvalidState)); } #[test] @@ -141,18 +137,20 @@ mod tests { assert_eq!(state.as_u64(), u64::MAX); assert_eq!(state.current_state_index(), (u64::MAX >> u8::BITS) as usize); - assert_eq!(state.monitor_status(), u8::MAX); + assert!(state + .monitor_status() + .is_err_and(|e| e == LogicEvaluationError::UnmappedError)); } #[test] fn snapshot_set_current_state_index_valid() { - let mut state = LogicStateSnapshot::from(0xDEADBEEF_DEADBEEF); + let mut state = LogicStateSnapshot::from(0xDEADBEEF_DEADBE00); state.set_current_state_index(0x00CAFEBA_DCAFEBAD); assert_eq!(state.current_state_index(), 0x00CAFEBA_DCAFEBAD); // Check other parameters unchanged. - assert_eq!(state.monitor_status(), 0xEF); + assert!(state.monitor_status().is_ok()); } #[test] @@ -165,9 +163,11 @@ mod tests { #[test] fn snapshot_set_monitor_status_valid() { let mut state = LogicStateSnapshot::from(0xDEADBEEF_DEADBEEF); - state.set_monitor_status(0xFA); + state.set_monitor_status(LogicEvaluationError::InvalidTransition); - assert_eq!(state.monitor_status(), 0xFA); + assert!(state + .monitor_status() + .is_err_and(|e| e == LogicEvaluationError::InvalidTransition)); // Check other parameters unchanged. assert_eq!(state.current_state_index(), 0xDEADBEEF_DEADBEEF >> u8::BITS); @@ -191,35 +191,19 @@ mod tests { } #[test] - fn state_update_some() { + fn state_swap() { let state = LogicState::new(0); - let _ = state.update(|prev_snapshot| { - // Make sure state is as expected. - assert_eq!(prev_snapshot.as_u64(), 0x00); - assert_eq!(prev_snapshot.current_state_index(), 0); - assert_eq!(prev_snapshot.monitor_status(), 0); - - Some(LogicStateSnapshot::from(0xDEADBEEF_DEADBEEF)) - }); - - let _ = state.update(|prev_snapshot| { - // Make sure state is as expected. - assert_eq!(prev_snapshot.as_u64(), 0xDEADBEEF_DEADBEEF); - assert_eq!(prev_snapshot.current_state_index(), 0xDEADBEEF_DEADBEEF >> u8::BITS); - assert_eq!(prev_snapshot.monitor_status(), 0xEF); - - Some(LogicStateSnapshot::from(0)) - }); - - assert_eq!(state.snapshot().as_u64(), 0); - } - - #[test] - fn state_update_none() { - let state = LogicState::new(4321); - let _ = state.update(|_| Some(LogicStateSnapshot::from(0xDEADBEEF_DEADBEEF))); - let _ = state.update(|_| None); - - assert_eq!(state.snapshot().as_u64(), 0xDEADBEEF_DEADBEEF); + let prev_snapshot = state.swap(LogicStateSnapshot::from(0xDEADBEEF_DEADBE02)); + + assert_eq!(prev_snapshot.as_u64(), 0x00); + assert_eq!(prev_snapshot.current_state_index(), 0); + assert!(prev_snapshot.monitor_status().is_ok()); + + let curr_snapshot = state.snapshot(); + assert_eq!(curr_snapshot.as_u64(), 0xDEADBEEF_DEADBE02); + assert_eq!(curr_snapshot.current_state_index(), 0xDEADBEEF_DEADBE02 >> u8::BITS); + assert!(curr_snapshot + .monitor_status() + .is_err_and(|e| e == LogicEvaluationError::InvalidTransition)); } }