Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ class SandboxPython:
"vars",
"help",
"dir",
"getattr",
"setattr",
"delattr",
"breakpoint",
}

RESTRICTED_ATTRS: ClassVar[set[str]] = {
"__class__",
"__bases__",
"__subclasses__",
"__mro__",
"__qualname__",
"__getattribute__",
"__getattr__",
}

@staticmethod
Expand Down Expand Up @@ -126,6 +140,22 @@ def safe_builtins() -> dict[str, Any]:
safe_builtins["__import__"] = SandboxPython.restricted_import
return safe_builtins

@staticmethod
def _check_restricted_attrs(code: str) -> None:
"""Checks if the code contains any restricted attribute access patterns.

Args:
code: The Python code to check.

Raises:
RuntimeError: If the code contains restricted attribute names.
"""
for attr in SandboxPython.RESTRICTED_ATTRS:
if attr in code:
raise RuntimeError(
f"Access to '{attr}' is not allowed in the sandbox."
)

@staticmethod
def exec(code: str, locals_: dict[str, Any]) -> None:
"""Executes Python code in a restricted environment.
Expand All @@ -134,6 +164,7 @@ def exec(code: str, locals_: dict[str, Any]) -> None:
code: The Python code to execute as a string.
locals_: A dictionary that will be used for local variable storage.
"""
SandboxPython._check_restricted_attrs(code)
exec(code, {"__builtins__": SandboxPython.safe_builtins()}, locals_) # noqa: S102


Expand Down Expand Up @@ -380,7 +411,13 @@ def run_code_unsafe(code: str, libraries_used: list[str]) -> str:
Printer.print("WARNING: Running code in unsafe mode", color="bold_magenta")
# Install libraries on the host machine
for library in libraries_used:
os.system(f"pip install {library}") # noqa: S605
try:
subprocess.run(
["pip", "install", library], # noqa: S607
check=True,
)
except subprocess.CalledProcessError as e:
return f"Failed to install library '{library}': {e}"

# Execute the code
try:
Expand Down
132 changes: 132 additions & 0 deletions lib/crewai-tools/tests/tools/test_code_interpreter_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,135 @@ def test_unsafe_mode_running_unsafe_code(printer_mock, docker_unavailable_mock):
"WARNING: Running code in unsafe mode", color="bold_magenta"
)
assert 5.0 == result


# --- Security fix tests ---


class TestCommandInjectionFix:
"""Tests that library names cannot be used for command injection."""

def test_unsafe_mode_uses_subprocess_not_os_system(self, printer_mock):
"""Verify that run_code_unsafe uses subprocess.run with list args,
not os.system with string interpolation."""
tool = CodeInterpreterTool(unsafe_mode=True)
code = "result = 1 + 1"
malicious_library = "numpy; echo pwned"

with patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run"
) as subprocess_mock:
tool.run_code_unsafe(code, [malicious_library])

# subprocess.run should be called with a list, not a shell string
subprocess_mock.assert_called_once_with(
["pip", "install", malicious_library],
check=True,
)

def test_unsafe_mode_no_os_system_call(self, printer_mock):
"""Verify os.system is never called during library installation."""
tool = CodeInterpreterTool(unsafe_mode=True)
code = "result = 1 + 1"

with patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.os.system"
) as os_system_mock, patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run"
):
tool.run_code_unsafe(code, ["numpy"])
os_system_mock.assert_not_called()

def test_unsafe_mode_installs_multiple_libraries_safely(self, printer_mock):
"""Verify each library is installed as a separate subprocess call."""
tool = CodeInterpreterTool(unsafe_mode=True)
code = "result = 1 + 1"
libraries = ["numpy", "pandas", "requests"]

with patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run"
) as subprocess_mock:
tool.run_code_unsafe(code, libraries)

assert subprocess_mock.call_count == 3
for lib in libraries:
subprocess_mock.assert_any_call(
["pip", "install", lib],
check=True,
)


class TestSandboxEscapeFix:
"""Tests that sandbox escape via __subclasses__ introspection is blocked."""

def test_subclasses_introspection_blocked(
self, printer_mock, docker_unavailable_mock
):
"""The classic sandbox escape via __subclasses__() must be blocked."""
tool = CodeInterpreterTool()
code = """
for c in ().__class__.__bases__[0].__subclasses__():
if c.__name__ == 'BuiltinImporter':
result = c.load_module('os').system('id')
break
"""
result = tool.run(code=code, libraries_used=[])
assert "not allowed in the sandbox" in result

def test_class_attr_blocked(self, printer_mock, docker_unavailable_mock):
"""Direct __class__ access must be blocked."""
tool = CodeInterpreterTool()
code = 'result = "".__class__'
result = tool.run(code=code, libraries_used=[])
assert "not allowed in the sandbox" in result

def test_bases_attr_blocked(self, printer_mock, docker_unavailable_mock):
"""Direct __bases__ access must be blocked."""
tool = CodeInterpreterTool()
code = "result = str.__bases__"
result = tool.run(code=code, libraries_used=[])
assert "not allowed in the sandbox" in result

def test_mro_attr_blocked(self, printer_mock, docker_unavailable_mock):
"""Direct __mro__ access must be blocked."""
tool = CodeInterpreterTool()
code = "result = str.__mro__"
result = tool.run(code=code, libraries_used=[])
assert "not allowed in the sandbox" in result

def test_subclasses_attr_blocked(self, printer_mock, docker_unavailable_mock):
"""Direct __subclasses__ access must be blocked."""
tool = CodeInterpreterTool()
code = "result = object.__subclasses__()"
result = tool.run(code=code, libraries_used=[])
assert "not allowed in the sandbox" in result

def test_getattr_blocked_in_sandbox(self, printer_mock, docker_unavailable_mock):
"""getattr() must be blocked to prevent attribute access bypass."""
tool = CodeInterpreterTool()
code = """
result = getattr(str, "__bases__")
"""
result = tool.run(code=code, libraries_used=[])
# Should be blocked by either the restricted attrs check or the builtins removal
assert "not allowed" in result or "not defined" in result

def test_safe_code_still_works(self, printer_mock, docker_unavailable_mock):
"""Normal code without introspection must still execute correctly."""
tool = CodeInterpreterTool()
code = """
x = [1, 2, 3]
result = sum(x) * 2
"""
result = tool.run(code=code, libraries_used=[])
assert result == 12

def test_restricted_attrs_check_method(self):
"""Test _check_restricted_attrs directly."""
# Safe code should pass
SandboxPython._check_restricted_attrs("result = 2 + 2")

# Each restricted attr should raise
for attr in SandboxPython.RESTRICTED_ATTRS:
with pytest.raises(RuntimeError, match="not allowed in the sandbox"):
SandboxPython._check_restricted_attrs(f"x = obj.{attr}")