From 581bfa50846cd927a86719badc5ee2313b77bb33 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Sat, 10 Jan 2026 13:25:30 +0000 Subject: [PATCH 01/14] Howl --- README.md | 6 ++++ pyproject.toml | 3 +- src/saluki/howl.py | 82 ++++++++++++++++++++++++++++++++++++++++++++++ src/saluki/main.py | 36 ++++++++++++++++++-- 4 files changed, 124 insertions(+), 3 deletions(-) create mode 100644 src/saluki/howl.py diff --git a/README.md b/README.md index 04e27db..5df7a65 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,12 @@ INFO:saluki: 0 - low:7515, high:7551, num_messages:36 `saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps. +## `howl` - Produce fake `ev44` messages to a topic + +``` +saluki howl mybroker:9092/dest_topic --events-per-frame 200 --frames-per-second 50 --tof-peak 10000000 --tof-sigma 5000000 --det-min 0 --det-max 500 +``` + # Developer setup `pip install -e .[dev]` diff --git a/pyproject.toml b/pyproject.toml index a74c110..2326bf2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,8 @@ dependencies = [ "ess-streaming-data-types", "confluent-kafka>=2.12.1", # for produce_batch in play() "python-dateutil", - "tzdata" + "tzdata", + "numpy", ] readme = {file = "README.md", content-type = "text/markdown"} license-files = ["LICENSE"] diff --git a/src/saluki/howl.py b/src/saluki/howl.py new file mode 100644 index 0000000..d06774e --- /dev/null +++ b/src/saluki/howl.py @@ -0,0 +1,82 @@ +import logging +import time + +import numpy as np +from confluent_kafka import Producer +from streaming_data_types import serialise_ev44 + +logger = logging.getLogger("saluki") + +RNG = np.random.default_rng() + + +def generate_fake_ev44( + msg_id: int, + events_per_frame: int, + tof_peak: float, + tof_sigma: float, + det_min: int, + det_max: int, +) -> bytes: + detector_ids = np.random.randint(low=det_min, high=det_max, size=events_per_frame) + tofs = np.maximum(0.0, RNG.normal(loc=tof_peak, scale=tof_sigma, size=events_per_frame)) + + return serialise_ev44( + source_name="saluki", + reference_time=[time.time() * 1_000_000_000], + message_id=msg_id, + reference_time_index=[0], + time_of_flight=tofs, + pixel_id=detector_ids, + ) + + +def howl( + broker: str, + topic: str, + events_per_frame: int, + frames_per_second: int, + tof_peak: float, + tof_sigma: float, + det_min: int, + det_max: int, +) -> None: + """ + Prints the broker and topic metadata for a given broker. + If a topic is given, only this topic's partitions and watermarks will be printed. + :param broker: The broker address including port number. + :param topic: Optional topic to filter information to. + """ + + producer = Producer( + { + "bootstrap.servers": broker, + } + ) + + target_frame_time = 1 / frames_per_second + + msg_id = 0 + + ev44_size = len(generate_fake_ev44(0, events_per_frame, tof_peak, tof_sigma, det_min, det_max)) + rate_bytes_per_sec = ev44_size * frames_per_second + rate_mbit_per_sec = (rate_bytes_per_sec / 1024**2) * 8 + logger.info(f"Attempting to simulate data rate: {rate_mbit_per_sec:.3f} MBit/s") + + while True: + start_time = time.time() + target_end_time = start_time + target_frame_time + + producer.produce( + topic=topic, + key=None, + value=generate_fake_ev44( + msg_id, events_per_frame, tof_peak, tof_sigma, det_min, det_max + ), + ) + msg_id += 1 + + sleep_time = max(target_end_time - time.time(), 0) + if sleep_time == 0: + logger.warning("saluki-howl cannot keep up with target event/frame rate") + time.sleep(sleep_time) diff --git a/src/saluki/main.py b/src/saluki/main.py index 1bcf97b..a8604d6 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -6,6 +6,7 @@ from saluki.listen import listen from saluki.play import play from saluki.sniff import sniff +from saluki.howl import howl from saluki.utils import dateutil_parsable_or_unix_timestamp, parse_kafka_uri logger = logging.getLogger("saluki") @@ -15,6 +16,7 @@ _CONSUME = "consume" _PLAY = "play" _SNIFF = "sniff" +_HOWL = "howl" def main() -> None: @@ -52,7 +54,9 @@ def main() -> None: _SNIFF, help="sniff - broker metadata", parents=[common_options] ) sniff_parser.add_argument( - "broker", type=str, help="broker, optionally suffixed with a topic name to filter to" + "broker", + type=str, + help="broker, optionally suffixed with a topic name to filter to", ) consumer_parser = argparse.ArgumentParser(add_help=False) @@ -65,7 +69,9 @@ def main() -> None: ) consumer_mode_parser = sub_parsers.add_parser( - _CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser, common_options] + _CONSUME, + help="consumer mode", + parents=[topic_parser, consumer_parser, common_options], ) consumer_mode_parser.add_argument( "-m", @@ -120,6 +126,19 @@ def main() -> None: nargs=2, ) + howl_parser = sub_parsers.add_parser( + _HOWL, + help="replay mode - replay data into another topic", + parents=[common_options], + ) + howl_parser.add_argument("topic", type=str, help="Destination topic") + howl_parser.add_argument("--events-per-frame", type=int, help="Events per frame to simulate") + howl_parser.add_argument("--frames-per-second", type=int, help="Frames per second to simulate") + howl_parser.add_argument("--tof-peak", type=float, help="Time-of-flight peak (ns)") + howl_parser.add_argument("--tof-sigma", type=float, help="Time-of-flight sigma (ns)") + howl_parser.add_argument("--det-min", type=int, help="Minimum detector ID") + howl_parser.add_argument("--det-max", type=int, help="Maximum detector ID") + if len(sys.argv) == 1: parser.print_help() sys.exit(1) @@ -169,6 +188,19 @@ def main() -> None: except RuntimeError: logger.debug(f"Sniffing whole broker {args.broker}") sniff(args.broker) + elif args.command == _HOWL: + broker, topic = parse_kafka_uri(args.topic) + logger.debug(f"Howling to topic {topic} on broker {broker}") + howl( + broker, + topic, + events_per_frame=args.events_per_frame, + frames_per_second=args.frames_per_second, + tof_peak=args.tof_peak, + tof_sigma=args.tof_sigma, + det_min=args.det_min, + det_max=args.det_max, + ) if __name__ == "__main__": From 215d8c390d40d0c421595a6c5cfc08dff93b6478 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Mon, 12 Jan 2026 18:34:42 +0000 Subject: [PATCH 02/14] howl MORE --- README.md | 7 ++-- src/saluki/howl.py | 81 ++++++++++++++++++++++++++++++++++++--------- src/saluki/main.py | 38 ++++++++++++++------- src/saluki/play.py | 3 +- src/saluki/utils.py | 4 ++- tests/test_howl.py | 0 tests/test_utils.py | 7 ++-- 7 files changed, 107 insertions(+), 33 deletions(-) create mode 100644 tests/test_howl.py diff --git a/README.md b/README.md index 5df7a65..beeddba 100644 --- a/README.md +++ b/README.md @@ -59,10 +59,13 @@ INFO:saluki: 0 - low:7515, high:7551, num_messages:36 `saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps. -## `howl` - Produce fake `ev44` messages to a topic +## `howl` - Produce fake run-like messages + +`saluki-howl` emits `ev44` events, `pl72` run starts, and `6s4t` run stops to Kafka, in a format which +look somewhat like a real run. ``` -saluki howl mybroker:9092/dest_topic --events-per-frame 200 --frames-per-second 50 --tof-peak 10000000 --tof-sigma 5000000 --det-min 0 --det-max 500 +saluki howl mybroker:9092 SOME_PREFIX ``` # Developer setup diff --git a/src/saluki/howl.py b/src/saluki/howl.py index d06774e..7dc3781 100644 --- a/src/saluki/howl.py +++ b/src/saluki/howl.py @@ -1,16 +1,19 @@ +import json import logging import time +import uuid import numpy as np from confluent_kafka import Producer -from streaming_data_types import serialise_ev44 +from streaming_data_types import serialise_6s4t, serialise_ev44, serialise_pl72 +from streaming_data_types.run_start_pl72 import DetectorSpectrumMap logger = logging.getLogger("saluki") RNG = np.random.default_rng() -def generate_fake_ev44( +def generate_fake_events( msg_id: int, events_per_frame: int, tof_peak: float, @@ -18,7 +21,7 @@ def generate_fake_ev44( det_min: int, det_max: int, ) -> bytes: - detector_ids = np.random.randint(low=det_min, high=det_max, size=events_per_frame) + detector_ids = RNG.integers(low=det_min, high=det_max, size=events_per_frame) tofs = np.maximum(0.0, RNG.normal(loc=tof_peak, scale=tof_sigma, size=events_per_frame)) return serialise_ev44( @@ -31,50 +34,98 @@ def generate_fake_ev44( ) +def generate_run_start(det_max: int) -> bytes: + det_spec_map = DetectorSpectrumMap( + detector_ids=np.arange(0, det_max, dtype=np.int32), + spectrum_numbers=np.arange(0, det_max, dtype=np.int32), + n_spectra=det_max, + ) + return serialise_pl72( + start_time=int(time.time() * 1000), + stop_time=None, + run_name=f"saluki-howl-{uuid.uuid4()}", + instrument_name="saluki-howl", + nexus_structure=json.dumps({}), + job_id=str(uuid.uuid4()), + filename=str(uuid.uuid4()), + detector_spectrum_map=det_spec_map, + ) + + +def generate_run_stop() -> bytes: + return serialise_6s4t( + stop_time=int(time.time() * 1000), + job_id=str(uuid.uuid4()), + ) + + def howl( broker: str, - topic: str, + topic_prefix: str, events_per_frame: int, frames_per_second: int, + frames_per_run: int, tof_peak: float, tof_sigma: float, det_min: int, det_max: int, ) -> None: """ - Prints the broker and topic metadata for a given broker. - If a topic is given, only this topic's partitions and watermarks will be printed. - :param broker: The broker address including port number. - :param topic: Optional topic to filter information to. + Send messages vaguely resembling a run to Kafka. """ producer = Producer( { "bootstrap.servers": broker, + "queue.buffering.max.messages": 100000, + "queue.buffering.max.ms": 20, } ) target_frame_time = 1 / frames_per_second - msg_id = 0 + frames = 0 - ev44_size = len(generate_fake_ev44(0, events_per_frame, tof_peak, tof_sigma, det_min, det_max)) + ev44_size = len( + generate_fake_events(0, events_per_frame, tof_peak, tof_sigma, det_min, det_max) + ) rate_bytes_per_sec = ev44_size * frames_per_second rate_mbit_per_sec = (rate_bytes_per_sec / 1024**2) * 8 - logger.info(f"Attempting to simulate data rate: {rate_mbit_per_sec:.3f} MBit/s") + logger.info(f"Attempting to simulate data rate: {rate_mbit_per_sec:.3f} Mbit/s") + logger.info(f"Each ev44 is {ev44_size} bytes") + + producer.produce( + topic=f"{topic_prefix}_runInfo", + key=None, + value=generate_run_start(det_max), + ) while True: start_time = time.time() target_end_time = start_time + target_frame_time producer.produce( - topic=topic, + topic=f"{topic_prefix}_events", key=None, - value=generate_fake_ev44( - msg_id, events_per_frame, tof_peak, tof_sigma, det_min, det_max + value=generate_fake_events( + frames, events_per_frame, tof_peak, tof_sigma, det_min, det_max ), ) - msg_id += 1 + producer.poll(0) + frames += 1 + + if frames_per_run != 0 and frames % frames_per_run == 0: + logger.info(f"Starting new run after {frames_per_run} simulated frames") + producer.produce( + topic=f"{topic_prefix}_runInfo", + key=None, + value=generate_run_stop(), + ) + producer.produce( + topic=f"{topic_prefix}_runInfo", + key=None, + value=generate_run_start(det_max), + ) sleep_time = max(target_end_time - time.time(), 0) if sleep_time == 0: diff --git a/src/saluki/main.py b/src/saluki/main.py index a8604d6..5cc793a 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -3,10 +3,10 @@ import sys from saluki.consume import consume +from saluki.howl import howl from saluki.listen import listen from saluki.play import play from saluki.sniff import sniff -from saluki.howl import howl from saluki.utils import dateutil_parsable_or_unix_timestamp, parse_kafka_uri logger = logging.getLogger("saluki") @@ -131,13 +131,28 @@ def main() -> None: help="replay mode - replay data into another topic", parents=[common_options], ) - howl_parser.add_argument("topic", type=str, help="Destination topic") - howl_parser.add_argument("--events-per-frame", type=int, help="Events per frame to simulate") - howl_parser.add_argument("--frames-per-second", type=int, help="Frames per second to simulate") - howl_parser.add_argument("--tof-peak", type=float, help="Time-of-flight peak (ns)") - howl_parser.add_argument("--tof-sigma", type=float, help="Time-of-flight sigma (ns)") - howl_parser.add_argument("--det-min", type=int, help="Minimum detector ID") - howl_parser.add_argument("--det-max", type=int, help="Maximum detector ID") + howl_parser.add_argument("broker", type=str, help="Kafka broker URL") + howl_parser.add_argument("topic_prefix", type=str, help="Topic prefix e.g. INSTNAME") + howl_parser.add_argument( + "--events-per-frame", type=int, help="Events per frame to simulate", default=100 + ) + howl_parser.add_argument( + "--frames-per-second", type=int, help="Frames per second to simulate", default=1 + ) + howl_parser.add_argument( + "--frames-per-run", + type=int, + help="Frames to take before beginning new run (0 to run forever)", + default=0, + ) + howl_parser.add_argument( + "--tof-peak", type=float, help="Time-of-flight peak (ns)", default=10_000_000 + ) + howl_parser.add_argument( + "--tof-sigma", type=float, help="Time-of-flight sigma (ns)", default=2_000_000 + ) + howl_parser.add_argument("--det-min", type=int, help="Minimum detector ID", default=0) + howl_parser.add_argument("--det-max", type=int, help="Maximum detector ID", default=1000) if len(sys.argv) == 1: parser.print_help() @@ -189,13 +204,12 @@ def main() -> None: logger.debug(f"Sniffing whole broker {args.broker}") sniff(args.broker) elif args.command == _HOWL: - broker, topic = parse_kafka_uri(args.topic) - logger.debug(f"Howling to topic {topic} on broker {broker}") howl( - broker, - topic, + args.broker, + args.topic_prefix, events_per_frame=args.events_per_frame, frames_per_second=args.frames_per_second, + frames_per_run=args.frames_per_run, tof_peak=args.tof_peak, tof_sigma=args.tof_sigma, det_min=args.det_min, diff --git a/src/saluki/play.py b/src/saluki/play.py index 1f058e8..1bb7119 100644 --- a/src/saluki/play.py +++ b/src/saluki/play.py @@ -70,7 +70,8 @@ def play( logger.debug(f"finished consuming {num_messages} messages") consumer.close() producer.produce_batch( - dest_topic, [{"key": message.key(), "value": message.value()} for message in msgs] + dest_topic, + [{"key": message.key(), "value": message.value()} for message in msgs], ) logger.debug(f"flushing producer. len(p): {len(producer)}") producer.flush(timeout=10) diff --git a/src/saluki/utils.py b/src/saluki/utils.py index d033a57..62845ef 100644 --- a/src/saluki/utils.py +++ b/src/saluki/utils.py @@ -34,7 +34,9 @@ def fallback_deserialiser(payload: bytes) -> str: def deserialise_and_print_messages( - msgs: List[Message], partition: int | None, schemas_to_filter_to: list[str] | None = None + msgs: List[Message], + partition: int | None, + schemas_to_filter_to: list[str] | None = None, ) -> None: for msg in msgs: try: diff --git a/tests/test_howl.py b/tests/test_howl.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_utils.py b/tests/test_utils.py index 263c280..cf19970 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -78,7 +78,9 @@ def test_deserialising_message_which_raises_does_not_stop_loop(mock_message): assert logger.info.call_count == 1 -def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list(mock_message): +def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list( + mock_message, +): with patch("saluki.utils.logger") as logger: ok_message = Mock(spec=Message) ok_message.value.return_value = serialise_fc00(config_change=1, streams=[]) # type: ignore @@ -179,7 +181,8 @@ def test_uri_with_no_topic(): @pytest.mark.parametrize( - "timestamp", ["2025-11-19T15:27:11", "2025-11-19T15:27:11Z", "2025-11-19T15:27:11+00:00"] + "timestamp", + ["2025-11-19T15:27:11", "2025-11-19T15:27:11Z", "2025-11-19T15:27:11+00:00"], ) def test_parses_datetime_properly_with_string(timestamp): assert dateutil_parsable_or_unix_timestamp(timestamp) == 1763566031000 From 147edd70b0e0942017f3d5b91f89a9cda8abe5c8 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Thu, 22 Jan 2026 09:58:28 +0000 Subject: [PATCH 03/14] More flags is more better --- src/saluki/howl.py | 26 +++++++++++++++++++------- src/saluki/listen.py | 3 +++ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/saluki/howl.py b/src/saluki/howl.py index 7dc3781..b991eaa 100644 --- a/src/saluki/howl.py +++ b/src/saluki/howl.py @@ -23,6 +23,7 @@ def generate_fake_events( ) -> bytes: detector_ids = RNG.integers(low=det_min, high=det_max, size=events_per_frame) tofs = np.maximum(0.0, RNG.normal(loc=tof_peak, scale=tof_sigma, size=events_per_frame)) + tofs.sort() return serialise_ev44( source_name="saluki", @@ -77,8 +78,12 @@ def howl( producer = Producer( { "bootstrap.servers": broker, + "queue.buffering.max.kbytes": 512*1024, "queue.buffering.max.messages": 100000, - "queue.buffering.max.ms": 20, + "queue.buffering.max.ms": 100, + "linger.ms": 50, + "batch.size": 512 * 1024**2, + "request.required.acks": 0, } ) @@ -100,9 +105,10 @@ def howl( value=generate_run_start(det_max), ) + target_time = time.time() + while True: - start_time = time.time() - target_end_time = start_time + target_frame_time + target_time += target_frame_time producer.produce( topic=f"{topic_prefix}_events", @@ -110,6 +116,7 @@ def howl( value=generate_fake_events( frames, events_per_frame, tof_peak, tof_sigma, det_min, det_max ), + timestamp=int(time.time() * 1000), ) producer.poll(0) frames += 1 @@ -120,14 +127,19 @@ def howl( topic=f"{topic_prefix}_runInfo", key=None, value=generate_run_stop(), + timestamp=int(time.time() * 1000), ) producer.produce( topic=f"{topic_prefix}_runInfo", key=None, value=generate_run_start(det_max), + timestamp=int(time.time() * 1000), ) - sleep_time = max(target_end_time - time.time(), 0) - if sleep_time == 0: - logger.warning("saluki-howl cannot keep up with target event/frame rate") - time.sleep(sleep_time) + sleep_time = max(target_time - time.time(), 0) + if sleep_time > 0: + time.sleep(sleep_time) + + t_diff = abs(time.time() - target_time) + if t_diff > 10: + logger.warning(f"saluki-howl running {t_diff:.3f} seconds behind schedule") diff --git a/src/saluki/listen.py b/src/saluki/listen.py index 82a205d..4eaea19 100644 --- a/src/saluki/listen.py +++ b/src/saluki/listen.py @@ -28,6 +28,9 @@ def listen( "group.id": f"saluki-listen-{uuid.uuid4()}", "auto.offset.reset": "latest", "enable.auto.commit": False, + "fetch.message.max.bytes": 512 * 1024**2, # 512MB + "fetch.max.bytes": 512 * 1024**2, # 512MB + "max.partition.fetch.bytes": 512 * 1024**2, # 512MB } ) c.subscribe([topic]) From 2c2440908162e17e128bf8b000728948b897d15f Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Thu, 22 Jan 2026 17:54:51 +0000 Subject: [PATCH 04/14] fmt --- src/saluki/howl.py | 12 ++++++++---- src/saluki/listen.py | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/saluki/howl.py b/src/saluki/howl.py index b991eaa..330ef5c 100644 --- a/src/saluki/howl.py +++ b/src/saluki/howl.py @@ -78,12 +78,12 @@ def howl( producer = Producer( { "bootstrap.servers": broker, - "queue.buffering.max.kbytes": 512*1024, + "queue.buffering.max.kbytes": 1024 * 1024, "queue.buffering.max.messages": 100000, "queue.buffering.max.ms": 100, - "linger.ms": 50, + "linger.ms": 10, "batch.size": 512 * 1024**2, - "request.required.acks": 0, + "batch.num.messages": 100_000, } ) @@ -96,7 +96,10 @@ def howl( ) rate_bytes_per_sec = ev44_size * frames_per_second rate_mbit_per_sec = (rate_bytes_per_sec / 1024**2) * 8 - logger.info(f"Attempting to simulate data rate: {rate_mbit_per_sec:.3f} Mbit/s") + logger.info( + f"Attempting to simulate data rate: {rate_mbit_per_sec:.3f} Mbit/s " + f"({rate_mbit_per_sec / 8:.3f} MiB/s)" + ) logger.info(f"Each ev44 is {ev44_size} bytes") producer.produce( @@ -104,6 +107,7 @@ def howl( key=None, value=generate_run_start(det_max), ) + producer.flush() target_time = time.time() diff --git a/src/saluki/listen.py b/src/saluki/listen.py index 4eaea19..874777b 100644 --- a/src/saluki/listen.py +++ b/src/saluki/listen.py @@ -26,10 +26,10 @@ def listen( { "bootstrap.servers": broker, "group.id": f"saluki-listen-{uuid.uuid4()}", - "auto.offset.reset": "latest", "enable.auto.commit": False, "fetch.message.max.bytes": 512 * 1024**2, # 512MB "fetch.max.bytes": 512 * 1024**2, # 512MB + "fetch.min.bytes": 512 * 1024**2, # 1MB "max.partition.fetch.bytes": 512 * 1024**2, # 512MB } ) From 556acbc86c0631defd1c2ba34311d4282c3b04bb Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Wed, 18 Feb 2026 13:13:02 +0000 Subject: [PATCH 05/14] Add tests --- src/saluki/howl.py | 110 +++++++++++++++++++++++++++------------------ tests/test_howl.py | 88 ++++++++++++++++++++++++++++++++++++ 2 files changed, 155 insertions(+), 43 deletions(-) diff --git a/src/saluki/howl.py b/src/saluki/howl.py index 330ef5c..f470e6e 100644 --- a/src/saluki/howl.py +++ b/src/saluki/howl.py @@ -60,6 +60,55 @@ def generate_run_stop() -> bytes: ) +def make_producer(broker: str) -> Producer: + return Producer( + { + "bootstrap.servers": broker, + "queue.buffering.max.kbytes": 1024 * 1024, + "queue.buffering.max.messages": 100000, + "linger.ms": 10, + "batch.num.messages": 10_000, + "max.in.flight.requests.per.connection": 32, + }, + ) + + +def produce_messages( + producer: Producer, + topic_prefix: str, + frame: int, + events_per_frame: int, + frames_per_run: int, + tof_peak: float, + tof_sigma: float, + det_min: int, + det_max: int, +) -> None: + now = time.time() + producer.produce( + topic=f"{topic_prefix}_events", + key=None, + value=generate_fake_events(frame, events_per_frame, tof_peak, tof_sigma, det_min, det_max), + timestamp=int(now * 1000), + ) + producer.poll(0) + + if frames_per_run != 0 and frame % frames_per_run == 0: + logger.info(f"Starting new run after {frames_per_run} simulated frames") + producer.produce( + topic=f"{topic_prefix}_runInfo", + key=None, + value=generate_run_stop(), + timestamp=int(now * 1000), + ) + producer.produce( + topic=f"{topic_prefix}_runInfo", + key=None, + value=generate_run_start(det_max), + timestamp=int(now * 1000), + ) + + def howl( broker: str, topic_prefix: str, @@ -70,22 +119,11 @@ def howl( tof_sigma: float, det_min: int, det_max: int, -) -> None: +) -> None: # pragma: no cover (infinite loop) """ Send messages vaguely resembling a run to Kafka. """ - - producer = Producer( - { - "bootstrap.servers": broker, - "queue.buffering.max.kbytes": 1024 * 1024, - "queue.buffering.max.messages": 100000, - "queue.buffering.max.ms": 100, - "linger.ms": 10, - "batch.size": 512 * 1024**2, - "batch.num.messages": 100_000, - } - ) + producer = make_producer(broker) target_frame_time = 1 / frames_per_second @@ -96,6 +134,7 @@ def howl( ) rate_bytes_per_sec = ev44_size * frames_per_second rate_mbit_per_sec = (rate_bytes_per_sec / 1024**2) * 8 + logger.info( f"Attempting to simulate data rate: {rate_mbit_per_sec:.3f} Mbit/s " f"({rate_mbit_per_sec / 8:.3f} MiB/s)" @@ -107,43 +146,28 @@ def howl( key=None, value=generate_run_start(det_max), ) - producer.flush() target_time = time.time() while True: target_time += target_frame_time + frames += 1 - producer.produce( - topic=f"{topic_prefix}_events", - key=None, - value=generate_fake_events( - frames, events_per_frame, tof_peak, tof_sigma, det_min, det_max - ), - timestamp=int(time.time() * 1000), + produce_messages( + producer, + topic_prefix, + frames, + events_per_frame, + frames_per_run, + tof_peak, + tof_sigma, + det_min, + det_max, ) - producer.poll(0) - frames += 1 - if frames_per_run != 0 and frames % frames_per_run == 0: - logger.info(f"Starting new run after {frames_per_run} simulated frames") - producer.produce( - topic=f"{topic_prefix}_runInfo", - key=None, - value=generate_run_stop(), - timestamp=int(time.time() * 1000), - ) - producer.produce( - topic=f"{topic_prefix}_runInfo", - key=None, - value=generate_run_start(det_max), - timestamp=int(time.time() * 1000), - ) - - sleep_time = max(target_time - time.time(), 0) + sleep_time = target_time - time.time() + if sleep_time > 0: time.sleep(sleep_time) - - t_diff = abs(time.time() - target_time) - if t_diff > 10: - logger.warning(f"saluki-howl running {t_diff:.3f} seconds behind schedule") + elif sleep_time < -10: + logger.warning(f"saluki-howl running {abs(sleep_time):.3f} seconds behind schedule") diff --git a/tests/test_howl.py b/tests/test_howl.py index e69de29..00479e1 100644 --- a/tests/test_howl.py +++ b/tests/test_howl.py @@ -0,0 +1,88 @@ +from unittest.mock import ANY, MagicMock, call + +import numpy as np +from confluent_kafka.cimpl import Producer +from streaming_data_types import deserialise_6s4t, deserialise_ev44, deserialise_pl72 + +from saluki.howl import ( + generate_fake_events, + generate_run_start, + generate_run_stop, + make_producer, + produce_messages, +) + + +def test_generate_run_start(): + pl72 = deserialise_pl72(generate_run_start(50000)) + assert pl72.detector_spectrum_map.n_spectra == 50000 + + +def test_generate_run_stop(): + deserialise_6s4t(generate_run_stop()) + + +def test_generate_events(): + ev44 = deserialise_ev44( + generate_fake_events( + msg_id=123, + events_per_frame=1, + tof_peak=12345, + tof_sigma=0, + det_min=5, + det_max=6, + ) + ) + assert ev44.message_id == 123 + assert ev44.pixel_id == np.array([5], dtype=np.int32) + assert ev44.time_of_flight == np.array([12345], dtype=np.int32) + + +def test_make_producer(): + # Just test it doesn't crash - can't usefully test much more than that + make_producer("127.0.0.1") + + +def test_produce_event_messages(): + producer = MagicMock(spec=Producer) + + produce_messages( + producer, + "some_prefix", + frame=1, + frames_per_run=10, + events_per_frame=1, + tof_peak=12345, + tof_sigma=0, + det_min=5, + det_max=6, + ) + + producer.produce.assert_called_once_with( + topic="some_prefix_events", key=None, value=ANY, timestamp=ANY + ) + + +def test_produce_runinfo_messages(): + producer = MagicMock(spec=Producer) + + produce_messages( + producer, + "some_prefix", + frame=10, + frames_per_run=10, + events_per_frame=1, + tof_peak=12345, + tof_sigma=0, + det_min=5, + det_max=6, + ) + + # event followed by run stop and run start pair. + producer.produce.assert_has_calls( + [ + call(topic="some_prefix_events", key=None, value=ANY, timestamp=ANY), + call(topic="some_prefix_runInfo", key=None, value=ANY, timestamp=ANY), + call(topic="some_prefix_runInfo", key=None, value=ANY, timestamp=ANY), + ] + ) From 131f8d081cd120498bb67743c95a47f79fa89f53 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Wed, 18 Feb 2026 13:14:47 +0000 Subject: [PATCH 06/14] pyright --- tests/test_howl.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_howl.py b/tests/test_howl.py index 00479e1..8a2ff23 100644 --- a/tests/test_howl.py +++ b/tests/test_howl.py @@ -15,7 +15,9 @@ def test_generate_run_start(): pl72 = deserialise_pl72(generate_run_start(50000)) - assert pl72.detector_spectrum_map.n_spectra == 50000 + det_spec_map = pl72.detector_spectrum_map + assert det_spec_map is not None + assert det_spec_map.n_spectra == 50000 def test_generate_run_stop(): From 8710e5045c4c987f46ea5e79a22a92b2016ee88d Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Tue, 3 Mar 2026 11:14:39 +0000 Subject: [PATCH 07/14] Remove fetch.min.bytes from Kafka consumer config Removed the 'fetch.min.bytes' configuration setting. --- src/saluki/listen.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/saluki/listen.py b/src/saluki/listen.py index 874777b..e7c942e 100644 --- a/src/saluki/listen.py +++ b/src/saluki/listen.py @@ -29,7 +29,6 @@ def listen( "enable.auto.commit": False, "fetch.message.max.bytes": 512 * 1024**2, # 512MB "fetch.max.bytes": 512 * 1024**2, # 512MB - "fetch.min.bytes": 512 * 1024**2, # 1MB "max.partition.fetch.bytes": 512 * 1024**2, # 512MB } ) From 3ee198c9e9c2454749ca794970caa68e592cfb29 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Thu, 12 Mar 2026 16:43:27 +0000 Subject: [PATCH 08/14] messages per frame --- src/saluki/howl.py | 38 ++++++++++++++++++++++---------------- src/saluki/main.py | 8 ++++++-- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/src/saluki/howl.py b/src/saluki/howl.py index f470e6e..66f3fe0 100644 --- a/src/saluki/howl.py +++ b/src/saluki/howl.py @@ -15,19 +15,20 @@ def generate_fake_events( msg_id: int, - events_per_frame: int, + events_per_message: int, tof_peak: float, tof_sigma: float, det_min: int, det_max: int, + timestamp: float, ) -> bytes: - detector_ids = RNG.integers(low=det_min, high=det_max, size=events_per_frame) - tofs = np.maximum(0.0, RNG.normal(loc=tof_peak, scale=tof_sigma, size=events_per_frame)) + detector_ids = RNG.integers(low=det_min, high=det_max, size=events_per_message) + tofs = np.maximum(0.0, RNG.normal(loc=tof_peak, scale=tof_sigma, size=events_per_message)) tofs.sort() return serialise_ev44( source_name="saluki", - reference_time=[time.time() * 1_000_000_000], + reference_time=[timestamp * 1_000_000_000], message_id=msg_id, reference_time_index=[0], time_of_flight=tofs, @@ -69,6 +70,7 @@ def make_producer(broker: str) -> Producer: "linger.ms": 10, "batch.num.messages": 10_000, "max.in.flight.requests.per.connection": 32, + "acks": 1, }, ) @@ -77,7 +79,8 @@ def produce_messages( producer: Producer, topic_prefix: str, frame: int, - events_per_frame: int, + events_per_message: int, + messages_per_frame: int, frames_per_run: int, tof_peak: float, tof_sigma: float, @@ -85,12 +88,13 @@ def produce_messages( det_max: int, ) -> None: now = time.time() - producer.produce( - topic=f"{topic_prefix}_events", - key=None, - value=generate_fake_events(frame, events_per_frame, tof_peak, tof_sigma, det_min, det_max), - timestamp=int(now * 1000), - ) + for _ in range(messages_per_frame): + producer.produce( + topic=f"{topic_prefix}_rawEvents", + key=None, + value=generate_fake_events(frame, events_per_message, tof_peak, tof_sigma, det_min, det_max, timestamp=now), + timestamp=int(now * 1000), + ) producer.poll(0) if frames_per_run != 0 and frame % frames_per_run == 0: @@ -112,7 +116,8 @@ def produce_messages( def howl( broker: str, topic_prefix: str, - events_per_frame: int, + events_per_message: int, + messages_per_frame: int, frames_per_second: int, frames_per_run: int, tof_peak: float, @@ -130,9 +135,9 @@ def howl( frames = 0 ev44_size = len( - generate_fake_events(0, events_per_frame, tof_peak, tof_sigma, det_min, det_max) + generate_fake_events(0, events_per_message, tof_peak, tof_sigma, det_min, det_max, timestamp=time.time()) ) - rate_bytes_per_sec = ev44_size * frames_per_second + rate_bytes_per_sec = ev44_size * messages_per_frame * frames_per_second rate_mbit_per_sec = (rate_bytes_per_sec / 1024**2) * 8 logger.info( @@ -157,7 +162,8 @@ def howl( producer, topic_prefix, frames, - events_per_frame, + events_per_message, + messages_per_frame, frames_per_run, tof_peak, tof_sigma, @@ -169,5 +175,5 @@ def howl( if sleep_time > 0: time.sleep(sleep_time) - elif sleep_time < -10: + else: logger.warning(f"saluki-howl running {abs(sleep_time):.3f} seconds behind schedule") diff --git a/src/saluki/main.py b/src/saluki/main.py index 5cc793a..caa2562 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -134,7 +134,10 @@ def main() -> None: howl_parser.add_argument("broker", type=str, help="Kafka broker URL") howl_parser.add_argument("topic_prefix", type=str, help="Topic prefix e.g. INSTNAME") howl_parser.add_argument( - "--events-per-frame", type=int, help="Events per frame to simulate", default=100 + "--events-per-message", type=int, help="Events per ev44 to simulate", default=100 + ) + howl_parser.add_argument( + "--messages-per-frame", type=int, help="Number of ev44 per frame to simulate", default=20 ) howl_parser.add_argument( "--frames-per-second", type=int, help="Frames per second to simulate", default=1 @@ -207,7 +210,8 @@ def main() -> None: howl( args.broker, args.topic_prefix, - events_per_frame=args.events_per_frame, + events_per_message=args.events_per_message, + messages_per_frame=args.messages_per_frame, frames_per_second=args.frames_per_second, frames_per_run=args.frames_per_run, tof_peak=args.tof_peak, From fd2297c497cf3802ead8c0f5fb0c8f097d0dea5c Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Thu, 12 Mar 2026 16:43:51 +0000 Subject: [PATCH 09/14] messages per frame --- src/saluki/howl.py | 20 ++++++++++++++++++-- src/saluki/main.py | 10 ++++++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/saluki/howl.py b/src/saluki/howl.py index 66f3fe0..0e9bf04 100644 --- a/src/saluki/howl.py +++ b/src/saluki/howl.py @@ -92,7 +92,15 @@ def produce_messages( producer.produce( topic=f"{topic_prefix}_rawEvents", key=None, - value=generate_fake_events(frame, events_per_message, tof_peak, tof_sigma, det_min, det_max, timestamp=now), + value=generate_fake_events( + frame, + events_per_message, + tof_peak, + tof_sigma, + det_min, + det_max, + timestamp=now, + ), timestamp=int(now * 1000), ) producer.poll(0) @@ -135,7 +143,15 @@ def howl( frames = 0 ev44_size = len( - generate_fake_events(0, events_per_message, tof_peak, tof_sigma, det_min, det_max, timestamp=time.time()) + generate_fake_events( + 0, + events_per_message, + tof_peak, + tof_sigma, + det_min, + det_max, + timestamp=time.time(), + ) ) rate_bytes_per_sec = ev44_size * messages_per_frame * frames_per_second rate_mbit_per_sec = (rate_bytes_per_sec / 1024**2) * 8 diff --git a/src/saluki/main.py b/src/saluki/main.py index caa2562..e54500f 100644 --- a/src/saluki/main.py +++ b/src/saluki/main.py @@ -134,10 +134,16 @@ def main() -> None: howl_parser.add_argument("broker", type=str, help="Kafka broker URL") howl_parser.add_argument("topic_prefix", type=str, help="Topic prefix e.g. INSTNAME") howl_parser.add_argument( - "--events-per-message", type=int, help="Events per ev44 to simulate", default=100 + "--events-per-message", + type=int, + help="Events per ev44 to simulate", + default=100, ) howl_parser.add_argument( - "--messages-per-frame", type=int, help="Number of ev44 per frame to simulate", default=20 + "--messages-per-frame", + type=int, + help="Number of ev44 per frame to simulate", + default=20, ) howl_parser.add_argument( "--frames-per-second", type=int, help="Frames per second to simulate", default=1 From 942841785f78afe414a5b4bdb3ddeca0af8b6197 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Thu, 12 Mar 2026 17:27:47 +0000 Subject: [PATCH 10/14] cheat --- src/saluki/howl.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/saluki/howl.py b/src/saluki/howl.py index 0e9bf04..27991cf 100644 --- a/src/saluki/howl.py +++ b/src/saluki/howl.py @@ -88,19 +88,21 @@ def produce_messages( det_max: int, ) -> None: now = time.time() + ev44 = generate_fake_events( + frame, + events_per_message, + tof_peak, + tof_sigma, + det_min, + det_max, + timestamp=now, + ) + for _ in range(messages_per_frame): producer.produce( topic=f"{topic_prefix}_rawEvents", key=None, - value=generate_fake_events( - frame, - events_per_message, - tof_peak, - tof_sigma, - det_min, - det_max, - timestamp=now, - ), + value=ev44, timestamp=int(now * 1000), ) producer.poll(0) From d7d5143fa636d35c46cbaf119922477e14e33e00 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Fri, 13 Mar 2026 09:11:30 +0000 Subject: [PATCH 11/14] unbreak tests --- tests/test_howl.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/test_howl.py b/tests/test_howl.py index 8a2ff23..8d7b8fe 100644 --- a/tests/test_howl.py +++ b/tests/test_howl.py @@ -28,11 +28,12 @@ def test_generate_events(): ev44 = deserialise_ev44( generate_fake_events( msg_id=123, - events_per_frame=1, + events_per_message=1, tof_peak=12345, tof_sigma=0, det_min=5, det_max=6, + timestamp=12345, ) ) assert ev44.message_id == 123 @@ -53,7 +54,8 @@ def test_produce_event_messages(): "some_prefix", frame=1, frames_per_run=10, - events_per_frame=1, + messages_per_frame=1, + events_per_message=1, tof_peak=12345, tof_sigma=0, det_min=5, @@ -61,7 +63,7 @@ def test_produce_event_messages(): ) producer.produce.assert_called_once_with( - topic="some_prefix_events", key=None, value=ANY, timestamp=ANY + topic="some_prefix_rawEvents", key=None, value=ANY, timestamp=ANY ) @@ -73,7 +75,8 @@ def test_produce_runinfo_messages(): "some_prefix", frame=10, frames_per_run=10, - events_per_frame=1, + messages_per_frame=1, + events_per_message=1, tof_peak=12345, tof_sigma=0, det_min=5, @@ -83,7 +86,7 @@ def test_produce_runinfo_messages(): # event followed by run stop and run start pair. producer.produce.assert_has_calls( [ - call(topic="some_prefix_events", key=None, value=ANY, timestamp=ANY), + call(topic="some_prefix_rawEvents", key=None, value=ANY, timestamp=ANY), call(topic="some_prefix_runInfo", key=None, value=ANY, timestamp=ANY), call(topic="some_prefix_runInfo", key=None, value=ANY, timestamp=ANY), ] From f470364c5968a044ce26400cf57fad7727d36271 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Fri, 13 Mar 2026 09:20:54 +0000 Subject: [PATCH 12/14] Add basic nexus structure --- src/saluki/howl.py | 36 ++++++++++++++++++++++++++++++++---- tests/test_howl.py | 2 +- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/saluki/howl.py b/src/saluki/howl.py index 27991cf..1873e3a 100644 --- a/src/saluki/howl.py +++ b/src/saluki/howl.py @@ -36,18 +36,46 @@ def generate_fake_events( ) -def generate_run_start(det_max: int) -> bytes: +def generate_run_start(det_max: int, topic_prefix: str) -> bytes: det_spec_map = DetectorSpectrumMap( detector_ids=np.arange(0, det_max, dtype=np.int32), spectrum_numbers=np.arange(0, det_max, dtype=np.int32), n_spectra=det_max, ) + + nexus_structure = { + "children": [ + { + "type": "group", + "name": "raw_data_1", + "children": [ + { + "type": "group", + "name": "events", + "children": [ + { + "type": "stream", + "stream": { + "topic": f"{topic_prefix}_events", + "source": "saluki_howl", + "writer_module": "ev44", + }, + }, + ], + "attributes": [{"name": "NX_class", "values": "NXentry"}], + }, + ], + "attributes": [{"name": "NX_class", "values": "NXentry"}], + } + ] + } + return serialise_pl72( start_time=int(time.time() * 1000), stop_time=None, run_name=f"saluki-howl-{uuid.uuid4()}", instrument_name="saluki-howl", - nexus_structure=json.dumps({}), + nexus_structure=json.dumps(nexus_structure), job_id=str(uuid.uuid4()), filename=str(uuid.uuid4()), detector_spectrum_map=det_spec_map, @@ -118,7 +146,7 @@ def produce_messages( producer.produce( topic=f"{topic_prefix}_runInfo", key=None, - value=generate_run_start(det_max), + value=generate_run_start(det_max, topic_prefix), timestamp=int(now * 1000), ) @@ -167,7 +195,7 @@ def howl( producer.produce( topic=f"{topic_prefix}_runInfo", key=None, - value=generate_run_start(det_max), + value=generate_run_start(det_max, topic_prefix), ) target_time = time.time() diff --git a/tests/test_howl.py b/tests/test_howl.py index 8d7b8fe..3c63e84 100644 --- a/tests/test_howl.py +++ b/tests/test_howl.py @@ -14,7 +14,7 @@ def test_generate_run_start(): - pl72 = deserialise_pl72(generate_run_start(50000)) + pl72 = deserialise_pl72(generate_run_start(50000, "test")) det_spec_map = pl72.detector_spectrum_map assert det_spec_map is not None assert det_spec_map.n_spectra == 50000 From 6741f06c04958ac12006660565353d2a7e574536 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Fri, 13 Mar 2026 09:27:09 +0000 Subject: [PATCH 13/14] Make proper jobID pairs --- src/saluki/howl.py | 17 ++++++++++------- tests/test_howl.py | 6 ++++-- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/saluki/howl.py b/src/saluki/howl.py index 1873e3a..a02f176 100644 --- a/src/saluki/howl.py +++ b/src/saluki/howl.py @@ -36,7 +36,7 @@ def generate_fake_events( ) -def generate_run_start(det_max: int, topic_prefix: str) -> bytes: +def generate_run_start(det_max: int, topic_prefix: str, job_id: str) -> bytes: det_spec_map = DetectorSpectrumMap( detector_ids=np.arange(0, det_max, dtype=np.int32), spectrum_numbers=np.arange(0, det_max, dtype=np.int32), @@ -76,16 +76,16 @@ def generate_run_start(det_max: int, topic_prefix: str) -> bytes: run_name=f"saluki-howl-{uuid.uuid4()}", instrument_name="saluki-howl", nexus_structure=json.dumps(nexus_structure), - job_id=str(uuid.uuid4()), + job_id=job_id, filename=str(uuid.uuid4()), detector_spectrum_map=det_spec_map, ) -def generate_run_stop() -> bytes: +def generate_run_stop(job_id: str) -> bytes: return serialise_6s4t( stop_time=int(time.time() * 1000), - job_id=str(uuid.uuid4()), + job_id=job_id, ) @@ -114,6 +114,7 @@ def produce_messages( tof_sigma: float, det_min: int, det_max: int, + current_job_id: str, ) -> None: now = time.time() ev44 = generate_fake_events( @@ -140,13 +141,14 @@ def produce_messages( producer.produce( topic=f"{topic_prefix}_runInfo", key=None, - value=generate_run_stop(), + value=generate_run_stop(current_job_id), timestamp=int(now * 1000), ) + current_job_id = str(uuid.uuid4()) producer.produce( topic=f"{topic_prefix}_runInfo", key=None, - value=generate_run_start(det_max, topic_prefix), + value=generate_run_start(det_max, topic_prefix, current_job_id), timestamp=int(now * 1000), ) @@ -192,10 +194,11 @@ def howl( ) logger.info(f"Each ev44 is {ev44_size} bytes") + current_job_id = str(uuid.uuid4()) producer.produce( topic=f"{topic_prefix}_runInfo", key=None, - value=generate_run_start(det_max, topic_prefix), + value=generate_run_start(det_max, topic_prefix, current_job_id), ) target_time = time.time() diff --git a/tests/test_howl.py b/tests/test_howl.py index 3c63e84..d846151 100644 --- a/tests/test_howl.py +++ b/tests/test_howl.py @@ -14,14 +14,14 @@ def test_generate_run_start(): - pl72 = deserialise_pl72(generate_run_start(50000, "test")) + pl72 = deserialise_pl72(generate_run_start(50000, "test", "some_job_id")) det_spec_map = pl72.detector_spectrum_map assert det_spec_map is not None assert det_spec_map.n_spectra == 50000 def test_generate_run_stop(): - deserialise_6s4t(generate_run_stop()) + deserialise_6s4t(generate_run_stop("some_job_id")) def test_generate_events(): @@ -60,6 +60,7 @@ def test_produce_event_messages(): tof_sigma=0, det_min=5, det_max=6, + current_job_id="some_job_id", ) producer.produce.assert_called_once_with( @@ -81,6 +82,7 @@ def test_produce_runinfo_messages(): tof_sigma=0, det_min=5, det_max=6, + current_job_id="some_job_id", ) # event followed by run stop and run start pair. From f1ae9dbdfd8112bae7329c4d78b269c7b8e49c38 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Fri, 13 Mar 2026 09:45:00 +0000 Subject: [PATCH 14/14] For real this time --- src/saluki/howl.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/saluki/howl.py b/src/saluki/howl.py index a02f176..f53edfe 100644 --- a/src/saluki/howl.py +++ b/src/saluki/howl.py @@ -115,7 +115,11 @@ def produce_messages( det_min: int, det_max: int, current_job_id: str, -) -> None: +) -> str: + """ + Returns: + Currently open JobID + """ now = time.time() ev44 = generate_fake_events( frame, @@ -151,6 +155,7 @@ def produce_messages( value=generate_run_start(det_max, topic_prefix, current_job_id), timestamp=int(now * 1000), ) + return current_job_id def howl( @@ -207,7 +212,7 @@ def howl( target_time += target_frame_time frames += 1 - produce_messages( + current_job_id = produce_messages( producer, topic_prefix, frames, @@ -218,6 +223,7 @@ def howl( tof_sigma, det_min, det_max, + current_job_id, ) sleep_time = target_time - time.time()