Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ current event.
- `on_error_execution()` works via naming convention but **only** when a transition for
`error.execution` is declared — it is NOT a generic callback.

### Thread safety

- The sync engine is **thread-safe**: multiple threads can send events to the same SM instance
concurrently. The processing loop uses a `threading.Lock` so at most one thread executes
transitions at a time. Event queues use `PriorityQueue` (stdlib, thread-safe).
- **Do not replace `PriorityQueue`** with non-thread-safe alternatives (e.g., `collections.deque`,
plain `list`) — this would break concurrent access guarantees.
- Stress tests in `tests/test_threading.py::TestThreadSafety` exercise real contention with
barriers and multiple sender threads. Any change to queue or locking internals must pass these.

### Invoke (`<invoke>`)

- `invoke.py` — `InvokeManager` on the engine manages the lifecycle: `mark_for_invoke()`,
Expand Down Expand Up @@ -127,6 +137,16 @@ timeout 120 uv run pytest -n 4

Testes normally run under 60s (~40s on average), so take a closer look if they take longer, it can be a regression.

### Debug logging

`log_cli_level` defaults to `WARNING` in `pyproject.toml`. The engine caches a no-op
for `logger.debug` at init time — running tests with `DEBUG` would bypass this
optimization and inflate benchmark numbers. To enable debug logs for a specific run:

```bash
uv run pytest -o log_cli_level=DEBUG tests/test_something.py
```

When analyzing warnings or extensive output, run the tests **once** saving the output to a file
(`> /tmp/pytest-output.txt 2>&1`), then analyze the file — instead of running the suite
repeatedly with different greps.
Expand Down
47 changes: 47 additions & 0 deletions docs/processing_model.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,50 @@ The machine starts, enters `trying` (attempt 1), and the eventless
self-transition keeps firing as long as `can_retry()` returns `True`. Once
the limit is reached, the second eventless transition fires — all within a
single macrostep triggered by initialization.


(thread-safety)=

## Thread safety

State machines are **thread-safe** for concurrent event sending. Multiple threads
can call `send()` or trigger events on the **same state machine instance**
simultaneously — the engine guarantees correct behavior through its internal
locking mechanism.

### How it works

The processing loop uses a non-blocking lock (`threading.Lock`). When a thread
sends an event:

1. The event is placed on the **external queue** (backed by a thread-safe
`PriorityQueue` from the standard library).
2. If no other thread is currently running the processing loop, the sending
thread acquires the lock and processes all queued events.
3. If another thread is already processing, the event is simply enqueued and
will be processed by the thread that holds the lock — no event is lost.

This means that **at most one thread executes transitions at any time**, preserving
the run-to-completion (RTC) guarantee while allowing safe concurrent access.

### What is safe

- **Multiple threads sending events** to the same state machine instance.
- **Reading state** (`current_state_value`, `configuration`) from any thread
while events are being processed. Note that transient `None` values may be
observed for `current_state_value` during configuration updates when using
[`atomic_configuration_update`](behaviour.md#atomic_configuration_update) `= False`
(the default on `StateChart`, SCXML-compliant). With `atomic_configuration_update = True`
(the default on `StateMachine`), the configuration is updated atomically at
the end of the microstep, so `None` is not observed.
- **Invoke handlers** running in background threads or thread executors
communicate with the parent machine via the thread-safe event queue.

### What to avoid

- **Do not share a state machine instance across threads with the async engine**
unless you ensure only one event loop drives the machine. The async engine is
designed for `asyncio` concurrency, not thread-based concurrency.
- **Callbacks execute in the processing thread**, not in the thread that sent
the event. Design callbacks accordingly (e.g., use locks if they access
shared external state).
16 changes: 16 additions & 0 deletions docs/releases/3.1.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ See {ref}`diagram:Sphinx directive` for full documentation.
[#589](https://github.com/fgmacedo/python-statemachine/pull/589).


### Performance: 5x–7x faster event processing

The engine's hot paths have been systematically profiled and optimized, resulting in
**4.7x–7.7x faster event throughput** and **1.9x–2.6x faster setup** across all
machine types. All optimizations are internal — no public API changes.
See [#592](https://github.com/fgmacedo/python-statemachine/pull/592) for details.


### Thread safety documentation

The sync engine is thread-safe: multiple threads can send events to the same state
machine instance concurrently. This is now documented in the
{ref}`processing model <thread-safety>` and verified by stress tests.
[#592](https://github.com/fgmacedo/python-statemachine/pull/592).


### Bugfixes in 3.1.0

- Fixes silent misuse of `Event()` with multiple positional arguments. Passing more than one
Expand Down
15 changes: 13 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ markers = [
]
python_files = ["tests.py", "test_*.py", "*_tests.py"]
xfail_strict = true
log_cli_level = "DEBUG"
# Log level WARNING by default; the engine caches a no-op for logger.debug at
# init time, so DEBUG here would bypass that optimization and slow benchmarks.
# To enable DEBUG logging for a specific test run:
# uv run pytest -o log_cli_level=DEBUG
log_cli_level = "WARNING"
log_cli_format = "%(relativeCreated)6.0fms %(threadName)-18s %(name)-35s %(message)s"
log_cli_date_format = "%H:%M:%S"
asyncio_default_fixture_loop_scope = "module"
Expand Down Expand Up @@ -131,7 +135,14 @@ disable_error_code = "annotation-unchecked"
mypy_path = "$MYPY_CONFIG_FILE_DIR/tests/django_project"

[[tool.mypy.overrides]]
module = ['django.*', 'pytest.*', 'pydot.*', 'sphinx_gallery.*', 'docutils.*', 'sphinx.*']
module = [
'django.*',
'pytest.*',
'pydot.*',
'sphinx_gallery.*',
'docutils.*',
'sphinx.*',
]
ignore_missing_imports = true

[tool.ruff]
Expand Down
159 changes: 159 additions & 0 deletions statemachine/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import Mapping
from typing import MutableSet

from .exceptions import InvalidStateValue
from .i18n import _
from .orderedset import OrderedSet

_SENTINEL = object()

if TYPE_CHECKING:
from .state import State


class Configuration:
"""Encapsulates the dual representation of the active state configuration.

Internally, ``current_state_value`` is either a scalar (single active state)
or an ``OrderedSet`` (parallel regions). This class hides that detail behind
a uniform interface for reading, mutating, and caching the resolved
``OrderedSet[State]``.
"""

__slots__ = (
"_instance_states",
"_model",
"_state_field",
"_states_map",
"_cached",
"_cached_value",
)

def __init__(
self,
instance_states: "Mapping[str, State]",
model: Any,
state_field: str,
states_map: "Dict[Any, State]",
):
self._instance_states = instance_states
self._model = model
self._state_field = state_field
self._states_map = states_map
self._cached: "OrderedSet[State] | None" = None
self._cached_value: Any = _SENTINEL

# -- Raw value (persisted on the model) ------------------------------------

@property
def value(self) -> Any:
"""The raw state value stored on the model (scalar or ``OrderedSet``)."""
return getattr(self._model, self._state_field, None)

@value.setter
def value(self, val: Any):
self._invalidate()
if val is not None and not isinstance(val, MutableSet) and val not in self._states_map:
raise InvalidStateValue(val)
setattr(self._model, self._state_field, val)

@property
def values(self) -> OrderedSet[Any]:
"""The set of raw state values currently active."""
v = self.value
if isinstance(v, OrderedSet):
return v
return OrderedSet([v])

# -- Resolved states -------------------------------------------------------

@property
def states(self) -> "OrderedSet[State]":
"""The set of currently active :class:`State` instances (cached)."""
csv = self.value
if self._cached is not None and self._cached_value is csv:
return self._cached
if csv is None:
return OrderedSet()

instance_states = self._instance_states
if not isinstance(csv, MutableSet):
result = OrderedSet([instance_states[self._states_map[csv].id]])
else:
result = OrderedSet([instance_states[self._states_map[v].id] for v in csv])

self._cached = result
self._cached_value = csv
return result

@states.setter
def states(self, new_configuration: "OrderedSet[State]"):
if len(new_configuration) == 0:
self.value = None
elif len(new_configuration) == 1:
self.value = next(iter(new_configuration)).value
else:
self.value = OrderedSet(s.value for s in new_configuration)

# -- Incremental mutation (used by the engine) -----------------------------

def add(self, state: "State"):
"""Add *state* to the configuration, maintaining the dual representation."""
csv = self.value
if csv is None:
self.value = state.value
elif isinstance(csv, MutableSet):
csv.add(state.value)
self._invalidate()
else:
self.value = OrderedSet([csv, state.value])

def discard(self, state: "State"):
"""Remove *state* from the configuration, normalizing back to scalar."""
csv = self.value
if isinstance(csv, MutableSet):
csv.discard(state.value)
self._invalidate()
if len(csv) == 1:
self.value = next(iter(csv))
elif len(csv) == 0:
self.value = None
elif csv == state.value:
self.value = None

# -- Deprecated v2 compat --------------------------------------------------

@property
def current_state(self) -> "State | OrderedSet[State]":
"""Resolve the current state with validation.

Unlike ``states`` (which returns an empty set for ``None``), this
raises ``InvalidStateValue`` when the value is ``None`` or not
found in ``states_map`` — matching the v2 ``current_state`` contract.
"""
csv = self.value
if csv is None:
raise InvalidStateValue(
csv,
_(
"There's no current state set. In async code, "
"did you activate the initial state? "
"(e.g., `await sm.activate_initial_state()`)"
),
)
try:
config = self.states
if len(config) == 1:
return next(iter(config))
return config
except KeyError as err:
raise InvalidStateValue(csv) from err

# -- Internal --------------------------------------------------------------

def _invalidate(self):
self._cached = None
self._cached_value = _SENTINEL
Loading
Loading