From b6f2a0d17c5bbb1b8b0f0cb65a34187bfb0b7b65 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Tue, 24 Feb 2026 13:14:40 +0530 Subject: [PATCH] Fix: Add explicit error signalling to read() (#390) read() now returns (data, success_flag) tuple and sets concore.last_read_status / concore_base.last_read_status to one of: SUCCESS, FILE_NOT_FOUND, TIMEOUT, PARSE_ERROR, EMPTY_DATA, RETRIES_EXCEEDED. - concore_base.read(): all return paths now yield (data, bool) - concore.py: exposes last_read_status, syncs from concore_base - concoredocker.py: same treatment - Existing tests updated for tuple returns - New test_read_status.py covers success, file-not-found, parse error, retries exceeded, ZMQ success/timeout/error, backward compatibility, and last_read_status exposure. Backward compatible: callers can use isinstance(result, tuple) or simply unpack with value, ok = concore.read(...). --- concore.py | 29 +++- concore_base.py | 74 ++++++++-- concoredocker.py | 7 +- tests/test_concore.py | 5 +- tests/test_concoredocker.py | 12 +- tests/test_read_status.py | 261 ++++++++++++++++++++++++++++++++++++ 6 files changed, 370 insertions(+), 18 deletions(-) create mode 100644 tests/test_read_status.py diff --git a/concore.py b/concore.py index 5bd112b..2147e75 100644 --- a/concore.py +++ b/concore.py @@ -38,6 +38,8 @@ zmq_ports = {} _cleanup_in_progress = False +last_read_status = "SUCCESS" + s = '' olds = '' delay = 1 @@ -110,7 +112,32 @@ def unchanged(): # I/O Handling (File + ZMQ) # =================================================================== def read(port_identifier, name, initstr_val): - return concore_base.read(_mod, port_identifier, name, initstr_val) + """Read data from a ZMQ port or file-based port. + + Returns: + tuple: (data, success_flag) where success_flag is True if real + data was received, False if a fallback/default was used. + Also sets ``concore.last_read_status`` to one of: + SUCCESS, FILE_NOT_FOUND, TIMEOUT, PARSE_ERROR, + EMPTY_DATA, RETRIES_EXCEEDED. + + Backward compatibility: + Legacy callers that do ``value = concore.read(...)`` will + receive a tuple. They can adapt with:: + + result = concore.read(...) + if isinstance(result, tuple): + value, ok = result + else: + value, ok = result, True + + Alternatively, check ``concore.last_read_status`` after the + call. + """ + global last_read_status + result = concore_base.read(_mod, port_identifier, name, initstr_val) + last_read_status = concore_base.last_read_status + return result def write(port_identifier, name, val, delta=0): diff --git a/concore_base.py b/concore_base.py index f6b3e1b..d64c526 100644 --- a/concore_base.py +++ b/concore_base.py @@ -187,6 +187,11 @@ def load_params(params_file): except Exception: return dict() +# =================================================================== +# Read Status Tracking +# =================================================================== +last_read_status = "SUCCESS" + # =================================================================== # I/O Handling (File + ZMQ) # =================================================================== @@ -201,6 +206,31 @@ def unchanged(mod): def read(mod, port_identifier, name, initstr_val): + """Read data from a ZMQ port or file-based port. + + Returns: + tuple: (data, success_flag) where success_flag is True if real + data was received, False if a fallback/default was used. + Also sets ``concore.last_read_status`` (and + ``concore_base.last_read_status``) to one of: + SUCCESS, FILE_NOT_FOUND, TIMEOUT, PARSE_ERROR, + EMPTY_DATA, RETRIES_EXCEEDED. + + Backward compatibility: + Legacy callers that do ``value = concore.read(...)`` will + receive a tuple. They can adapt with:: + + result = concore.read(...) + if isinstance(result, tuple): + value, ok = result + else: + value, ok = result, True + + Alternatively, check ``concore.last_read_status`` after the + call. + """ + global last_read_status + # Default return default_return_val = initstr_val if isinstance(initstr_val, str): @@ -214,41 +244,52 @@ def read(mod, port_identifier, name, initstr_val): zmq_p = mod.zmq_ports[port_identifier] try: message = zmq_p.recv_json_with_retry() + if message is None: + last_read_status = "TIMEOUT" + return default_return_val, False # Strip simtime prefix if present (mirroring file-based read behavior) if isinstance(message, list) and len(message) > 0: first_element = message[0] if isinstance(first_element, (int, float)): mod.simtime = max(mod.simtime, first_element) - return message[1:] - return message + last_read_status = "SUCCESS" + return message[1:], True + last_read_status = "SUCCESS" + return message, True except zmq.error.ZMQError as e: logger.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.") - return default_return_val + last_read_status = "TIMEOUT" + return default_return_val, False except Exception as e: logger.error(f"Unexpected error during ZMQ read on port {port_identifier} (name: {name}): {e}. Returning default.") - return default_return_val + last_read_status = "PARSE_ERROR" + return default_return_val, False # Case 2: File-based port try: file_port_num = int(port_identifier) except ValueError: logger.error(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.") - return default_return_val + last_read_status = "PARSE_ERROR" + return default_return_val, False time.sleep(mod.delay) port_dir = mod._port_path(mod.inpath, file_port_num) file_path = os.path.join(port_dir, name) ins = "" + file_not_found = False try: with open(file_path, "r") as infile: ins = infile.read() except FileNotFoundError: + file_not_found = True ins = str(initstr_val) mod.s += ins # Update s to break unchanged() loop except Exception as e: logger.error(f"Error reading {file_path}: {e}. Using default value.") - return default_return_val + last_read_status = "FILE_NOT_FOUND" + return default_return_val, False # Retry logic if file is empty attempts = 0 @@ -265,7 +306,8 @@ def read(mod, port_identifier, name, initstr_val): if len(ins) == 0: logger.error(f"Max retries reached for {file_path}, using default value.") - return default_return_val + last_read_status = "RETRIES_EXCEEDED" + return default_return_val, False mod.s += ins @@ -276,13 +318,25 @@ def read(mod, port_identifier, name, initstr_val): current_simtime_from_file = inval[0] if isinstance(current_simtime_from_file, (int, float)): mod.simtime = max(mod.simtime, current_simtime_from_file) - return inval[1:] + if file_not_found: + last_read_status = "FILE_NOT_FOUND" + return inval[1:], False + last_read_status = "SUCCESS" + return inval[1:], True else: logger.warning(f"Warning: Unexpected data format in {file_path}: {ins}. Returning raw content or default.") - return inval + if file_not_found: + last_read_status = "FILE_NOT_FOUND" + return inval, False + last_read_status = "SUCCESS" + return inval, True except Exception as e: logger.error(f"Error parsing content from {file_path} ('{ins}'): {e}. Returning default.") - return default_return_val + if file_not_found: + last_read_status = "FILE_NOT_FOUND" + else: + last_read_status = "PARSE_ERROR" + return default_return_val, False def write(mod, port_identifier, name, val, delta=0): diff --git a/concoredocker.py b/concoredocker.py index 84d4caa..51df0eb 100644 --- a/concoredocker.py +++ b/concoredocker.py @@ -25,6 +25,8 @@ zmq_ports = {} _cleanup_in_progress = False +last_read_status = "SUCCESS" + s = '' olds = '' delay = 1 @@ -90,7 +92,10 @@ def unchanged(): # I/O Handling (File + ZMQ) # =================================================================== def read(port_identifier, name, initstr_val): - return concore_base.read(_mod, port_identifier, name, initstr_val) + global last_read_status + result = concore_base.read(_mod, port_identifier, name, initstr_val) + last_read_status = concore_base.last_read_status + return result def write(port_identifier, name, val, delta=0): concore_base.write(_mod, port_identifier, name, val, delta) diff --git a/tests/test_concore.py b/tests/test_concore.py index fd89729..58e99c3 100644 --- a/tests/test_concore.py +++ b/tests/test_concore.py @@ -273,9 +273,10 @@ def recv_json_with_retry(self): original_data = [1.5, 2.5, 3.5] concore.write("roundtrip_test", "data", original_data) - # Read should return original data (simtime stripped) - result = concore.read("roundtrip_test", "data", "[]") + # Read should return original data (simtime stripped) plus success flag + result, ok = concore.read("roundtrip_test", "data", "[]") assert result == original_data + assert ok is True class TestSimtimeNotMutatedByWrite: diff --git a/tests/test_concoredocker.py b/tests/test_concoredocker.py index c374391..91bf2fb 100644 --- a/tests/test_concoredocker.py +++ b/tests/test_concoredocker.py @@ -130,9 +130,10 @@ def test_reads_and_parses_data(self, temp_dir): concoredocker.s = '' concoredocker.simtime = 0 - result = concoredocker.read(1, "data", "[0, 0, 0]") + result, ok = concoredocker.read(1, "data", "[0, 0, 0]") assert result == [100, 200] + assert ok is True assert concoredocker.simtime == 7.0 concoredocker.inpath = old_inpath concoredocker.delay = old_delay @@ -148,9 +149,10 @@ def test_returns_default_when_file_missing(self, temp_dir): concoredocker.s = '' concoredocker.simtime = 0 - result = concoredocker.read(1, "nofile", "[0, 5, 5]") + result, ok = concoredocker.read(1, "nofile", "[0, 5, 5]") assert result == [5, 5] + assert ok is False concoredocker.inpath = old_inpath concoredocker.delay = old_delay @@ -194,9 +196,10 @@ def recv_json_with_retry(self): concoredocker.zmq_ports["test_zmq"] = DummyPort() concoredocker.simtime = 0 - result = concoredocker.read("test_zmq", "data", "[]") + result, ok = concoredocker.read("test_zmq", "data", "[]") assert result == [4.0, 5.0] + assert ok is True assert concoredocker.simtime == 10.0 def test_read_updates_simtime_monotonically(self): @@ -232,6 +235,7 @@ def recv_json_with_retry(self): original = [1.5, 2.5, 3.5] concoredocker.write("roundtrip", "data", original) - result = concoredocker.read("roundtrip", "data", "[]") + result, ok = concoredocker.read("roundtrip", "data", "[]") assert result == original + assert ok is True diff --git a/tests/test_read_status.py b/tests/test_read_status.py new file mode 100644 index 0000000..54dc4b8 --- /dev/null +++ b/tests/test_read_status.py @@ -0,0 +1,261 @@ +"""Tests for read() error signalling (Issue #390). + +read() now returns (data, success_flag) and sets +concore.last_read_status / concore_base.last_read_status. +""" + +import os +import pytest +import numpy as np + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +class DummyZMQPort: + """Minimal stand-in for ZeroMQPort used in ZMQ read tests.""" + + def __init__(self, response=None, raise_on_recv=None): + self._response = response + self._raise_on_recv = raise_on_recv + + def send_json_with_retry(self, message): + self._response = message + + def recv_json_with_retry(self): + if self._raise_on_recv: + raise self._raise_on_recv + return self._response + + +# --------------------------------------------------------------------------- +# File-based read tests +# --------------------------------------------------------------------------- + +class TestReadFileSuccess: + """read() on a valid file returns (data, True) with SUCCESS status.""" + + @pytest.fixture(autouse=True) + def setup(self, temp_dir, monkeypatch): + import concore + self.concore = concore + monkeypatch.setattr(concore, 'delay', 0) + + # Create ./in1/ym with valid data: [simtime, value] + in_dir = os.path.join(temp_dir, "in1") + os.makedirs(in_dir, exist_ok=True) + with open(os.path.join(in_dir, "ym"), "w") as f: + f.write("[10, 3.14]") + + monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in")) + + def test_returns_data_and_true(self): + data, ok = self.concore.read(1, "ym", "[0, 0.0]") + assert ok is True + assert data == [3.14] + + def test_last_read_status_is_success(self): + self.concore.read(1, "ym", "[0, 0.0]") + assert self.concore.last_read_status == "SUCCESS" + + +class TestReadFileMissing: + """read() on a missing file returns (default, False) with FILE_NOT_FOUND.""" + + @pytest.fixture(autouse=True) + def setup(self, temp_dir, monkeypatch): + import concore + self.concore = concore + monkeypatch.setattr(concore, 'delay', 0) + # Point to a directory that does NOT have the file + monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in")) + + def test_returns_default_and_false(self): + data, ok = self.concore.read(1, "nonexistent", "[0, 0.0]") + assert ok is False + + def test_last_read_status_is_file_not_found(self): + self.concore.read(1, "nonexistent", "[0, 0.0]") + assert self.concore.last_read_status == "FILE_NOT_FOUND" + + +class TestReadFileParseError: + """read() returns (default, False) with PARSE_ERROR on malformed content.""" + + @pytest.fixture(autouse=True) + def setup(self, temp_dir, monkeypatch): + import concore + self.concore = concore + monkeypatch.setattr(concore, 'delay', 0) + + in_dir = os.path.join(temp_dir, "in1") + os.makedirs(in_dir, exist_ok=True) + with open(os.path.join(in_dir, "ym"), "w") as f: + f.write("NOT_VALID_PYTHON{{{") + + monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in")) + + def test_returns_default_and_false(self): + data, ok = self.concore.read(1, "ym", "[0, 0.0]") + assert ok is False + + def test_last_read_status_is_parse_error(self): + self.concore.read(1, "ym", "[0, 0.0]") + assert self.concore.last_read_status == "PARSE_ERROR" + + +class TestReadFileRetriesExceeded: + """read() returns (default, False) with RETRIES_EXCEEDED when file is empty.""" + + @pytest.fixture(autouse=True) + def setup(self, temp_dir, monkeypatch): + import concore + self.concore = concore + monkeypatch.setattr(concore, 'delay', 0) + + # Create an empty file + in_dir = os.path.join(temp_dir, "in1") + os.makedirs(in_dir, exist_ok=True) + with open(os.path.join(in_dir, "ym"), "w") as f: + pass # empty + + monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in")) + + def test_returns_default_and_false(self): + data, ok = self.concore.read(1, "ym", "[0, 0.0]") + assert ok is False + + def test_last_read_status_is_retries_exceeded(self): + self.concore.read(1, "ym", "[0, 0.0]") + assert self.concore.last_read_status == "RETRIES_EXCEEDED" + + +# --------------------------------------------------------------------------- +# ZMQ read tests +# --------------------------------------------------------------------------- + +class TestReadZMQSuccess: + """Successful ZMQ read returns (data, True).""" + + @pytest.fixture(autouse=True) + def setup(self, monkeypatch): + import concore + self.concore = concore + self.original_ports = concore.zmq_ports.copy() + yield + concore.zmq_ports.clear() + concore.zmq_ports.update(self.original_ports) + + def test_zmq_read_returns_data_and_true(self): + dummy = DummyZMQPort(response=[5, 1.1, 2.2]) + self.concore.zmq_ports["test_port"] = dummy + self.concore.simtime = 0 + + data, ok = self.concore.read("test_port", "ym", "[]") + assert ok is True + assert data == [1.1, 2.2] + assert self.concore.last_read_status == "SUCCESS" + + +class TestReadZMQTimeout: + """ZMQ read that returns None (timeout) yields (default, False).""" + + @pytest.fixture(autouse=True) + def setup(self, monkeypatch): + import concore + self.concore = concore + self.original_ports = concore.zmq_ports.copy() + yield + concore.zmq_ports.clear() + concore.zmq_ports.update(self.original_ports) + + def test_zmq_timeout_returns_default_and_false(self): + dummy = DummyZMQPort(response=None) # recv returns None → timeout + self.concore.zmq_ports["test_port"] = dummy + + data, ok = self.concore.read("test_port", "ym", "[]") + assert ok is False + assert self.concore.last_read_status == "TIMEOUT" + + +class TestReadZMQError: + """ZMQ read that raises ZMQError yields (default, False).""" + + @pytest.fixture(autouse=True) + def setup(self, monkeypatch): + import concore + self.concore = concore + self.original_ports = concore.zmq_ports.copy() + yield + concore.zmq_ports.clear() + concore.zmq_ports.update(self.original_ports) + + def test_zmq_error_returns_default_and_false(self): + import zmq + dummy = DummyZMQPort(raise_on_recv=zmq.error.ZMQError("test error")) + self.concore.zmq_ports["test_port"] = dummy + + data, ok = self.concore.read("test_port", "ym", "[]") + assert ok is False + assert self.concore.last_read_status == "TIMEOUT" + + +# --------------------------------------------------------------------------- +# Backward compatibility +# --------------------------------------------------------------------------- + +class TestReadBackwardCompatibility: + """Legacy callers can use isinstance check on the result.""" + + @pytest.fixture(autouse=True) + def setup(self, temp_dir, monkeypatch): + import concore + self.concore = concore + monkeypatch.setattr(concore, 'delay', 0) + + in_dir = os.path.join(temp_dir, "in1") + os.makedirs(in_dir, exist_ok=True) + with open(os.path.join(in_dir, "ym"), "w") as f: + f.write("[10, 42.0]") + + monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in")) + + def test_legacy_unpack_pattern(self): + """The recommended migration pattern works correctly.""" + result = self.concore.read(1, "ym", "[0, 0.0]") + + if isinstance(result, tuple): + value, ok = result + else: + value = result + ok = True + + assert value == [42.0] + assert ok is True + + def test_tuple_unpack(self): + """New-style callers can unpack directly.""" + value, ok = self.concore.read(1, "ym", "[0, 0.0]") + assert value == [42.0] + assert ok is True + + +# --------------------------------------------------------------------------- +# last_read_status exposed on module +# --------------------------------------------------------------------------- + +class TestLastReadStatusExposed: + """concore.last_read_status is publicly accessible.""" + + def test_attribute_exists(self): + import concore + assert hasattr(concore, 'last_read_status') + + def test_initial_value_is_success(self): + import concore + # Before any read, default is SUCCESS + assert concore.last_read_status in ( + "SUCCESS", "FILE_NOT_FOUND", "TIMEOUT", + "PARSE_ERROR", "EMPTY_DATA", "RETRIES_EXCEEDED", + )