diff --git a/lib/crewai-tools/src/crewai_tools/tools/code_interpreter_tool/code_interpreter_tool.py b/lib/crewai-tools/src/crewai_tools/tools/code_interpreter_tool/code_interpreter_tool.py index c4a2093eec..2092c439ff 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/code_interpreter_tool/code_interpreter_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/code_interpreter_tool/code_interpreter_tool.py @@ -80,6 +80,20 @@ class SandboxPython: "vars", "help", "dir", + "getattr", + "setattr", + "delattr", + "breakpoint", + } + + RESTRICTED_ATTRS: ClassVar[set[str]] = { + "__class__", + "__bases__", + "__subclasses__", + "__mro__", + "__qualname__", + "__getattribute__", + "__getattr__", } @staticmethod @@ -126,6 +140,22 @@ def safe_builtins() -> dict[str, Any]: safe_builtins["__import__"] = SandboxPython.restricted_import return safe_builtins + @staticmethod + def _check_restricted_attrs(code: str) -> None: + """Checks if the code contains any restricted attribute access patterns. + + Args: + code: The Python code to check. + + Raises: + RuntimeError: If the code contains restricted attribute names. + """ + for attr in SandboxPython.RESTRICTED_ATTRS: + if attr in code: + raise RuntimeError( + f"Access to '{attr}' is not allowed in the sandbox." + ) + @staticmethod def exec(code: str, locals_: dict[str, Any]) -> None: """Executes Python code in a restricted environment. @@ -134,6 +164,7 @@ def exec(code: str, locals_: dict[str, Any]) -> None: code: The Python code to execute as a string. locals_: A dictionary that will be used for local variable storage. """ + SandboxPython._check_restricted_attrs(code) exec(code, {"__builtins__": SandboxPython.safe_builtins()}, locals_) # noqa: S102 @@ -380,7 +411,13 @@ def run_code_unsafe(code: str, libraries_used: list[str]) -> str: Printer.print("WARNING: Running code in unsafe mode", color="bold_magenta") # Install libraries on the host machine for library in libraries_used: - os.system(f"pip install {library}") # noqa: S605 + try: + subprocess.run( + ["pip", "install", library], # noqa: S607 + check=True, + ) + except subprocess.CalledProcessError as e: + return f"Failed to install library '{library}': {e}" # Execute the code try: diff --git a/lib/crewai-tools/tests/tools/test_code_interpreter_tool.py b/lib/crewai-tools/tests/tools/test_code_interpreter_tool.py index ca1f21a239..990af551dc 100644 --- a/lib/crewai-tools/tests/tools/test_code_interpreter_tool.py +++ b/lib/crewai-tools/tests/tools/test_code_interpreter_tool.py @@ -172,3 +172,135 @@ def test_unsafe_mode_running_unsafe_code(printer_mock, docker_unavailable_mock): "WARNING: Running code in unsafe mode", color="bold_magenta" ) assert 5.0 == result + + +# --- Security fix tests --- + + +class TestCommandInjectionFix: + """Tests that library names cannot be used for command injection.""" + + def test_unsafe_mode_uses_subprocess_not_os_system(self, printer_mock): + """Verify that run_code_unsafe uses subprocess.run with list args, + not os.system with string interpolation.""" + tool = CodeInterpreterTool(unsafe_mode=True) + code = "result = 1 + 1" + malicious_library = "numpy; echo pwned" + + with patch( + "crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run" + ) as subprocess_mock: + tool.run_code_unsafe(code, [malicious_library]) + + # subprocess.run should be called with a list, not a shell string + subprocess_mock.assert_called_once_with( + ["pip", "install", malicious_library], + check=True, + ) + + def test_unsafe_mode_no_os_system_call(self, printer_mock): + """Verify os.system is never called during library installation.""" + tool = CodeInterpreterTool(unsafe_mode=True) + code = "result = 1 + 1" + + with patch( + "crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.os.system" + ) as os_system_mock, patch( + "crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run" + ): + tool.run_code_unsafe(code, ["numpy"]) + os_system_mock.assert_not_called() + + def test_unsafe_mode_installs_multiple_libraries_safely(self, printer_mock): + """Verify each library is installed as a separate subprocess call.""" + tool = CodeInterpreterTool(unsafe_mode=True) + code = "result = 1 + 1" + libraries = ["numpy", "pandas", "requests"] + + with patch( + "crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run" + ) as subprocess_mock: + tool.run_code_unsafe(code, libraries) + + assert subprocess_mock.call_count == 3 + for lib in libraries: + subprocess_mock.assert_any_call( + ["pip", "install", lib], + check=True, + ) + + +class TestSandboxEscapeFix: + """Tests that sandbox escape via __subclasses__ introspection is blocked.""" + + def test_subclasses_introspection_blocked( + self, printer_mock, docker_unavailable_mock + ): + """The classic sandbox escape via __subclasses__() must be blocked.""" + tool = CodeInterpreterTool() + code = """ +for c in ().__class__.__bases__[0].__subclasses__(): + if c.__name__ == 'BuiltinImporter': + result = c.load_module('os').system('id') + break +""" + result = tool.run(code=code, libraries_used=[]) + assert "not allowed in the sandbox" in result + + def test_class_attr_blocked(self, printer_mock, docker_unavailable_mock): + """Direct __class__ access must be blocked.""" + tool = CodeInterpreterTool() + code = 'result = "".__class__' + result = tool.run(code=code, libraries_used=[]) + assert "not allowed in the sandbox" in result + + def test_bases_attr_blocked(self, printer_mock, docker_unavailable_mock): + """Direct __bases__ access must be blocked.""" + tool = CodeInterpreterTool() + code = "result = str.__bases__" + result = tool.run(code=code, libraries_used=[]) + assert "not allowed in the sandbox" in result + + def test_mro_attr_blocked(self, printer_mock, docker_unavailable_mock): + """Direct __mro__ access must be blocked.""" + tool = CodeInterpreterTool() + code = "result = str.__mro__" + result = tool.run(code=code, libraries_used=[]) + assert "not allowed in the sandbox" in result + + def test_subclasses_attr_blocked(self, printer_mock, docker_unavailable_mock): + """Direct __subclasses__ access must be blocked.""" + tool = CodeInterpreterTool() + code = "result = object.__subclasses__()" + result = tool.run(code=code, libraries_used=[]) + assert "not allowed in the sandbox" in result + + def test_getattr_blocked_in_sandbox(self, printer_mock, docker_unavailable_mock): + """getattr() must be blocked to prevent attribute access bypass.""" + tool = CodeInterpreterTool() + code = """ +result = getattr(str, "__bases__") +""" + result = tool.run(code=code, libraries_used=[]) + # Should be blocked by either the restricted attrs check or the builtins removal + assert "not allowed" in result or "not defined" in result + + def test_safe_code_still_works(self, printer_mock, docker_unavailable_mock): + """Normal code without introspection must still execute correctly.""" + tool = CodeInterpreterTool() + code = """ +x = [1, 2, 3] +result = sum(x) * 2 +""" + result = tool.run(code=code, libraries_used=[]) + assert result == 12 + + def test_restricted_attrs_check_method(self): + """Test _check_restricted_attrs directly.""" + # Safe code should pass + SandboxPython._check_restricted_attrs("result = 2 + 2") + + # Each restricted attr should raise + for attr in SandboxPython.RESTRICTED_ATTRS: + with pytest.raises(RuntimeError, match="not allowed in the sandbox"): + SandboxPython._check_restricted_attrs(f"x = obj.{attr}")