Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions concore_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ def send_json_with_retry(self, message):
except zmq.Again:
logger.warning(f"Send timeout (attempt {attempt + 1}/5)")
time.sleep(0.5)
logger.error("Failed to send after retries.")
return
raise TimeoutError(f"ZMQ send failed after 5 retries on {self.address}")

def recv_json_with_retry(self):
"""Receive JSON message with retries if timeout occurs."""
Expand All @@ -56,8 +55,7 @@ def recv_json_with_retry(self):
except zmq.Again:
logger.warning(f"Receive timeout (attempt {attempt + 1}/5)")
time.sleep(0.5)
logger.error("Failed to receive after retries.")
return None
raise TimeoutError(f"ZMQ recv failed after 5 retries on {self.address}")


def init_zmq_port(mod, port_name, port_type, address, socket_type_str):
Expand Down Expand Up @@ -221,6 +219,9 @@ def read(mod, port_identifier, name, initstr_val):
mod.simtime = max(mod.simtime, first_element)
return message[1:]
return message
except TimeoutError as e:
logger.error(f"ZMQ recv timeout on port {port_identifier} (name: {name}): {e}. Returning default.")
return default_return_val
except zmq.error.ZMQError as e:
logger.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
return default_return_val
Expand Down Expand Up @@ -304,6 +305,8 @@ def write(mod, port_identifier, name, val, delta=0):
# Mutation breaks cross-language determinism (see issue #385).
else:
zmq_p.send_json_with_retry(zmq_val)
except TimeoutError as e:
logger.error(f"ZMQ send timeout on port {port_identifier} (name: {name}): {e}")
except zmq.error.ZMQError as e:
logger.error(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
except Exception as e:
Expand Down
88 changes: 88 additions & 0 deletions tests/test_concore.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,3 +419,91 @@ def test_write_timestamp_matches_cpp_semantics(self, temp_dir):
"After 3 writes with delta=1 simtime must remain 0 "
"(matching C++/MATLAB/Verilog); got %s" % concore.simtime
)


# ===================================================================
# ZMQ Retry Exhaustion Tests (Issue #393)
# ===================================================================

class TestZMQRetryExhaustion:
"""Tests for issue #393 — TimeoutError on retry exhaustion."""

@pytest.fixture(autouse=True)
def reset_zmq_ports(self):
import concore
original_ports = concore.zmq_ports.copy()
yield
concore.zmq_ports.clear()
concore.zmq_ports.update(original_ports)

@pytest.fixture(autouse=True)
def reset_simtime(self):
import concore
old_simtime = concore.simtime
yield
concore.simtime = old_simtime

def test_recv_json_with_retry_raises_timeout_error(self):
"""recv_json_with_retry must raise TimeoutError after 5 failed attempts."""
from concore import ZeroMQPort
from unittest.mock import MagicMock, patch
import zmq

with patch.object(ZeroMQPort, '__init__', lambda self, *a, **kw: None):
port = ZeroMQPort.__new__(ZeroMQPort)
port.socket = MagicMock()
port.socket.recv_json.side_effect = zmq.Again()
port.address = "tcp://test:5555"

with pytest.raises(TimeoutError, match="ZMQ recv failed after 5 retries"):
port.recv_json_with_retry()

assert port.socket.recv_json.call_count == 5

def test_send_json_with_retry_raises_timeout_error(self):
"""send_json_with_retry must raise TimeoutError after 5 failed attempts."""
from concore import ZeroMQPort
from unittest.mock import MagicMock, patch
import zmq

with patch.object(ZeroMQPort, '__init__', lambda self, *a, **kw: None):
port = ZeroMQPort.__new__(ZeroMQPort)
port.socket = MagicMock()
port.socket.send_json.side_effect = zmq.Again()
port.address = "tcp://test:5555"

with pytest.raises(TimeoutError, match="ZMQ send failed after 5 retries"):
port.send_json_with_retry({"test": "data"})

assert port.socket.send_json.call_count == 5

def test_read_returns_default_on_zmq_timeout(self):
"""read() must return default_return_val when recv exhausts retries, not None."""
import concore

class MockZMQPort:
def recv_json_with_retry(self):
raise TimeoutError("ZMQ recv failed after 5 retries on tcp://test:5555")

concore.zmq_ports["test_timeout_port"] = MockZMQPort()
concore.simtime = 0

result = concore.read("test_timeout_port", "test_name", "[1.0, 2.0]")

assert result == [1.0, 2.0], (
"read() must return default_return_val on TimeoutError, got %s" % result
)

def test_write_does_not_crash_on_zmq_send_timeout(self):
"""write() must handle TimeoutError from send gracefully."""
import concore

class MockZMQPort:
def send_json_with_retry(self, message):
raise TimeoutError("ZMQ send failed after 5 retries on tcp://test:5555")

concore.zmq_ports["test_timeout_port"] = MockZMQPort()
concore.simtime = 0

# Should not raise — just log the error
concore.write("test_timeout_port", "test_name", [1.0, 2.0])
54 changes: 54 additions & 0 deletions tests/test_concoredocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,57 @@ def recv_json_with_retry(self):
result = concoredocker.read("roundtrip", "data", "[]")

assert result == original


# ===================================================================
# ZMQ Retry Exhaustion Tests (Issue #393)
# ===================================================================

class TestZMQRetryExhaustion:
"""Tests for issue #393 — TimeoutError on retry exhaustion via concoredocker."""

@pytest.fixture(autouse=True)
def reset_zmq_ports(self):
import concoredocker
original_ports = concoredocker.zmq_ports.copy()
yield
concoredocker.zmq_ports.clear()
concoredocker.zmq_ports.update(original_ports)

@pytest.fixture(autouse=True)
def reset_simtime(self):
import concoredocker
old_simtime = concoredocker.simtime
yield
concoredocker.simtime = old_simtime

def test_read_returns_default_on_zmq_timeout(self):
"""read() must return default_return_val when recv exhausts retries, not None."""
import concoredocker

class MockZMQPort:
def recv_json_with_retry(self):
raise TimeoutError("ZMQ recv failed after 5 retries on tcp://test:5555")

concoredocker.zmq_ports["test_timeout_port"] = MockZMQPort()
concoredocker.simtime = 0

result = concoredocker.read("test_timeout_port", "test_name", "[1.0, 2.0]")

assert result == [1.0, 2.0], (
"read() must return default_return_val on TimeoutError, got %s" % result
)

def test_write_does_not_crash_on_zmq_send_timeout(self):
"""write() must handle TimeoutError from send gracefully."""
import concoredocker

class MockZMQPort:
def send_json_with_retry(self, message):
raise TimeoutError("ZMQ send failed after 5 retries on tcp://test:5555")

concoredocker.zmq_ports["test_timeout_port"] = MockZMQPort()
concoredocker.simtime = 0

# Should not raise — just log the error
concoredocker.write("test_timeout_port", "test_name", [1.0, 2.0])