Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion concore.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
zmq_ports = {}
_cleanup_in_progress = False

last_read_status = "SUCCESS"

s = ''
olds = ''
delay = 1
Expand Down Expand Up @@ -110,7 +112,32 @@ def unchanged():
# I/O Handling (File + ZMQ)
# ===================================================================
def read(port_identifier, name, initstr_val):
return concore_base.read(_mod, port_identifier, name, initstr_val)
"""Read data from a ZMQ port or file-based port.

Returns:
tuple: (data, success_flag) where success_flag is True if real
data was received, False if a fallback/default was used.
Also sets ``concore.last_read_status`` to one of:
SUCCESS, FILE_NOT_FOUND, TIMEOUT, PARSE_ERROR,
EMPTY_DATA, RETRIES_EXCEEDED.

Backward compatibility:
Legacy callers that do ``value = concore.read(...)`` will
receive a tuple. They can adapt with::

result = concore.read(...)
if isinstance(result, tuple):
value, ok = result
else:
value, ok = result, True

Alternatively, check ``concore.last_read_status`` after the
call.
"""
global last_read_status
result = concore_base.read(_mod, port_identifier, name, initstr_val)
last_read_status = concore_base.last_read_status
return result


def write(port_identifier, name, val, delta=0):
Expand Down
74 changes: 64 additions & 10 deletions concore_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ def load_params(params_file):
except Exception:
return dict()

# ===================================================================
# Read Status Tracking
# ===================================================================
last_read_status = "SUCCESS"

# ===================================================================
# I/O Handling (File + ZMQ)
# ===================================================================
Expand All @@ -201,6 +206,31 @@ def unchanged(mod):


def read(mod, port_identifier, name, initstr_val):
"""Read data from a ZMQ port or file-based port.

Returns:
tuple: (data, success_flag) where success_flag is True if real
data was received, False if a fallback/default was used.
Also sets ``concore.last_read_status`` (and
``concore_base.last_read_status``) to one of:
SUCCESS, FILE_NOT_FOUND, TIMEOUT, PARSE_ERROR,
EMPTY_DATA, RETRIES_EXCEEDED.

Backward compatibility:
Legacy callers that do ``value = concore.read(...)`` will
receive a tuple. They can adapt with::

result = concore.read(...)
if isinstance(result, tuple):
value, ok = result
else:
value, ok = result, True

Alternatively, check ``concore.last_read_status`` after the
call.
"""
global last_read_status

# Default return
default_return_val = initstr_val
if isinstance(initstr_val, str):
Expand All @@ -214,41 +244,52 @@ def read(mod, port_identifier, name, initstr_val):
zmq_p = mod.zmq_ports[port_identifier]
try:
message = zmq_p.recv_json_with_retry()
if message is None:
last_read_status = "TIMEOUT"
return default_return_val, False
# Strip simtime prefix if present (mirroring file-based read behavior)
if isinstance(message, list) and len(message) > 0:
first_element = message[0]
if isinstance(first_element, (int, float)):
mod.simtime = max(mod.simtime, first_element)
return message[1:]
return message
last_read_status = "SUCCESS"
return message[1:], True
last_read_status = "SUCCESS"
return message, True
except zmq.error.ZMQError as e:
logger.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
return default_return_val
last_read_status = "TIMEOUT"
return default_return_val, False
except Exception as e:
logger.error(f"Unexpected error during ZMQ read on port {port_identifier} (name: {name}): {e}. Returning default.")
return default_return_val
last_read_status = "PARSE_ERROR"
return default_return_val, False

# Case 2: File-based port
try:
file_port_num = int(port_identifier)
except ValueError:
logger.error(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
return default_return_val
last_read_status = "PARSE_ERROR"
return default_return_val, False

time.sleep(mod.delay)
port_dir = mod._port_path(mod.inpath, file_port_num)
file_path = os.path.join(port_dir, name)
ins = ""

file_not_found = False
try:
with open(file_path, "r") as infile:
ins = infile.read()
except FileNotFoundError:
file_not_found = True
ins = str(initstr_val)
mod.s += ins # Update s to break unchanged() loop
except Exception as e:
logger.error(f"Error reading {file_path}: {e}. Using default value.")
return default_return_val
last_read_status = "FILE_NOT_FOUND"
return default_return_val, False

# Retry logic if file is empty
attempts = 0
Expand All @@ -265,7 +306,8 @@ def read(mod, port_identifier, name, initstr_val):

if len(ins) == 0:
logger.error(f"Max retries reached for {file_path}, using default value.")
return default_return_val
last_read_status = "RETRIES_EXCEEDED"
return default_return_val, False

mod.s += ins

Expand All @@ -276,13 +318,25 @@ def read(mod, port_identifier, name, initstr_val):
current_simtime_from_file = inval[0]
if isinstance(current_simtime_from_file, (int, float)):
mod.simtime = max(mod.simtime, current_simtime_from_file)
return inval[1:]
if file_not_found:
last_read_status = "FILE_NOT_FOUND"
return inval[1:], False
last_read_status = "SUCCESS"
return inval[1:], True
else:
logger.warning(f"Warning: Unexpected data format in {file_path}: {ins}. Returning raw content or default.")
return inval
if file_not_found:
last_read_status = "FILE_NOT_FOUND"
return inval, False
last_read_status = "SUCCESS"
return inval, True
except Exception as e:
logger.error(f"Error parsing content from {file_path} ('{ins}'): {e}. Returning default.")
return default_return_val
if file_not_found:
last_read_status = "FILE_NOT_FOUND"
else:
last_read_status = "PARSE_ERROR"
return default_return_val, False


def write(mod, port_identifier, name, val, delta=0):
Expand Down
7 changes: 6 additions & 1 deletion concoredocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
zmq_ports = {}
_cleanup_in_progress = False

last_read_status = "SUCCESS"

s = ''
olds = ''
delay = 1
Expand Down Expand Up @@ -90,7 +92,10 @@ def unchanged():
# I/O Handling (File + ZMQ)
# ===================================================================
def read(port_identifier, name, initstr_val):
return concore_base.read(_mod, port_identifier, name, initstr_val)
global last_read_status
result = concore_base.read(_mod, port_identifier, name, initstr_val)
last_read_status = concore_base.last_read_status
return result

def write(port_identifier, name, val, delta=0):
concore_base.write(_mod, port_identifier, name, val, delta)
Expand Down
5 changes: 3 additions & 2 deletions tests/test_concore.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,10 @@ def recv_json_with_retry(self):
original_data = [1.5, 2.5, 3.5]
concore.write("roundtrip_test", "data", original_data)

# Read should return original data (simtime stripped)
result = concore.read("roundtrip_test", "data", "[]")
# Read should return original data (simtime stripped) plus success flag
result, ok = concore.read("roundtrip_test", "data", "[]")
assert result == original_data
assert ok is True


class TestSimtimeNotMutatedByWrite:
Expand Down
12 changes: 8 additions & 4 deletions tests/test_concoredocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ def test_reads_and_parses_data(self, temp_dir):

concoredocker.s = ''
concoredocker.simtime = 0
result = concoredocker.read(1, "data", "[0, 0, 0]")
result, ok = concoredocker.read(1, "data", "[0, 0, 0]")

assert result == [100, 200]
assert ok is True
assert concoredocker.simtime == 7.0
concoredocker.inpath = old_inpath
concoredocker.delay = old_delay
Expand All @@ -148,9 +149,10 @@ def test_returns_default_when_file_missing(self, temp_dir):

concoredocker.s = ''
concoredocker.simtime = 0
result = concoredocker.read(1, "nofile", "[0, 5, 5]")
result, ok = concoredocker.read(1, "nofile", "[0, 5, 5]")

assert result == [5, 5]
assert ok is False
concoredocker.inpath = old_inpath
concoredocker.delay = old_delay

Expand Down Expand Up @@ -194,9 +196,10 @@ def recv_json_with_retry(self):
concoredocker.zmq_ports["test_zmq"] = DummyPort()
concoredocker.simtime = 0

result = concoredocker.read("test_zmq", "data", "[]")
result, ok = concoredocker.read("test_zmq", "data", "[]")

assert result == [4.0, 5.0]
assert ok is True
assert concoredocker.simtime == 10.0

def test_read_updates_simtime_monotonically(self):
Expand Down Expand Up @@ -232,6 +235,7 @@ def recv_json_with_retry(self):

original = [1.5, 2.5, 3.5]
concoredocker.write("roundtrip", "data", original)
result = concoredocker.read("roundtrip", "data", "[]")
result, ok = concoredocker.read("roundtrip", "data", "[]")

assert result == original
assert ok is True
Loading