diff --git a/deploy/docker/schemas.py b/deploy/docker/schemas.py index 21d47fc44..a4a530f08 100644 --- a/deploy/docker/schemas.py +++ b/deploy/docker/schemas.py @@ -73,11 +73,15 @@ class HTMLRequest(BaseModel): class ScreenshotRequest(BaseModel): url: str screenshot_wait_for: Optional[float] = 2 - output_path: Optional[str] = None + +class ScreenshotRequestWithOutput(ScreenshotRequest): + output_path: str class PDFRequest(BaseModel): url: str - output_path: Optional[str] = None + +class PDFRequestWithOutput(PDFRequest): + output_path: str class JSEndpointRequest(BaseModel): diff --git a/deploy/docker/server.py b/deploy/docker/server.py index 7ae1adb8b..3cfc7550f 100644 --- a/deploy/docker/server.py +++ b/deploy/docker/server.py @@ -29,7 +29,9 @@ RawCode, HTMLRequest, ScreenshotRequest, + ScreenshotRequestWithOutput, PDFRequest, + PDFRequestWithOutput, JSEndpointRequest, ) @@ -83,6 +85,24 @@ # Hooks are disabled by default for security (RCE risk). Set to "true" to enable. HOOKS_ENABLED = os.environ.get("CRAWL4AI_HOOKS_ENABLED", "false").lower() == "true" +# Output mode for /screenshot and /pdf endpoints. +# "disabled" (default): always return base64 inline, output_path not accepted +# "sandboxed": write only within CRAWL4AI_OUTPUT_DIR +# "unrestricted": original behavior, no path validation +OUTPUT_MODE = os.environ.get("CRAWL4AI_OUTPUT_MODE", "disabled") +OUTPUT_DIR = os.environ.get("CRAWL4AI_OUTPUT_DIR", "/tmp/crawl4ai_output") + + +def safe_output_path(user_path: str) -> str: + """Resolve user_path within OUTPUT_DIR. Raise if it escapes.""" + if os.path.isabs(user_path): + raise HTTPException(400, "output_path must be a relative path in sandboxed mode") + base = os.path.realpath(OUTPUT_DIR) + resolved = os.path.realpath(os.path.join(base, user_path)) + if not resolved.startswith(base + os.sep) and resolved != base: + raise HTTPException(400, "output_path must not escape the output directory") + return resolved + # ── default browser config helper ───────────────────────────── def get_default_browser_config() -> BrowserConfig: """Get default BrowserConfig from config.yml.""" @@ -378,13 +398,13 @@ async def generate_html( @mcp_tool("screenshot") async def generate_screenshot( request: Request, - body: ScreenshotRequest, + body: ScreenshotRequestWithOutput if OUTPUT_MODE != "disabled" else ScreenshotRequest, _td: Dict = Depends(token_dep), ): """ - Capture a full-page PNG screenshot of the specified URL, waiting an optional delay before capture, - Use when you need an image snapshot of the rendered page. Its recommened to provide an output path to save the screenshot. - Then in result instead of the screenshot you will get a path to the saved file. + Capture a full-page PNG screenshot of the specified URL, waiting an optional delay before capture. + Use when you need an image snapshot of the rendered page. When output mode is enabled, + provide an output_path to save the screenshot to disk. """ validate_url_scheme(body.url) from crawler_pool import get_crawler @@ -395,8 +415,14 @@ async def generate_screenshot( if not results[0].success: raise HTTPException(500, detail=results[0].error_message or "Crawl failed") screenshot_data = results[0].screenshot - if body.output_path: - abs_path = os.path.abspath(body.output_path) + output_path = getattr(body, "output_path", None) + if output_path: + if OUTPUT_MODE == "sandboxed": + abs_path = safe_output_path(output_path) + elif OUTPUT_MODE == "unrestricted": + abs_path = os.path.abspath(output_path) + else: + raise HTTPException(400, "File output is disabled") os.makedirs(os.path.dirname(abs_path), exist_ok=True) with open(abs_path, "wb") as f: f.write(base64.b64decode(screenshot_data)) @@ -413,13 +439,13 @@ async def generate_screenshot( @mcp_tool("pdf") async def generate_pdf( request: Request, - body: PDFRequest, + body: PDFRequestWithOutput if OUTPUT_MODE != "disabled" else PDFRequest, _td: Dict = Depends(token_dep), ): """ - Generate a PDF document of the specified URL, - Use when you need a printable or archivable snapshot of the page. It is recommended to provide an output path to save the PDF. - Then in result instead of the PDF you will get a path to the saved file. + Generate a PDF document of the specified URL. + Use when you need a printable or archivable snapshot of the page. When output mode is + enabled, provide an output_path to save the PDF to disk. """ validate_url_scheme(body.url) from crawler_pool import get_crawler @@ -430,8 +456,14 @@ async def generate_pdf( if not results[0].success: raise HTTPException(500, detail=results[0].error_message or "Crawl failed") pdf_data = results[0].pdf - if body.output_path: - abs_path = os.path.abspath(body.output_path) + output_path = getattr(body, "output_path", None) + if output_path: + if OUTPUT_MODE == "sandboxed": + abs_path = safe_output_path(output_path) + elif OUTPUT_MODE == "unrestricted": + abs_path = os.path.abspath(output_path) + else: + raise HTTPException(400, "File output is disabled") os.makedirs(os.path.dirname(abs_path), exist_ok=True) with open(abs_path, "wb") as f: f.write(pdf_data) diff --git a/deploy/docker/tests/test_security_fixes.py b/deploy/docker/tests/test_security_fixes.py index 9321d111e..1e9f5e32e 100644 --- a/deploy/docker/tests/test_security_fixes.py +++ b/deploy/docker/tests/test_security_fixes.py @@ -160,6 +160,67 @@ def test_hooks_disabled_when_false(self): os.environ.pop("CRAWL4AI_HOOKS_ENABLED", None) +class TestPathTraversal(unittest.TestCase): + """Test path traversal protection in output_path handling.""" + + def setUp(self): + self.output_dir = os.path.join( + os.environ.get("TMPDIR", "/tmp"), "crawl4ai_test_output" + ) + os.makedirs(self.output_dir, exist_ok=True) + + def safe_output_path(self, user_path: str) -> str: + """Local version of safe_output_path for testing.""" + if os.path.isabs(user_path): + raise ValueError("output_path must be a relative path in sandboxed mode") + base = os.path.realpath(self.output_dir) + resolved = os.path.realpath(os.path.join(base, user_path)) + if not resolved.startswith(base + os.sep) and resolved != base: + raise ValueError("output_path must not escape the output directory") + return resolved + + def test_traversal_rejected(self): + """Paths with ../ that escape output dir must be rejected.""" + with self.assertRaises(ValueError): + self.safe_output_path("../../etc/passwd") + + def test_absolute_path_rejected(self): + """Absolute paths must be rejected in sandboxed mode.""" + with self.assertRaises(ValueError): + self.safe_output_path("/etc/passwd") + + def test_valid_relative_path_accepted(self): + """Valid relative paths within the output dir must be accepted.""" + result = self.safe_output_path("screenshots/test.png") + self.assertTrue(result.startswith(os.path.realpath(self.output_dir))) + + def test_dot_dot_within_dir_accepted(self): + """Paths with ../ that stay within the output dir are OK.""" + result = self.safe_output_path("subdir/../test.png") + self.assertTrue(result.startswith(os.path.realpath(self.output_dir))) + + def test_symlink_escape_rejected(self): + """Symlink-based escapes must be rejected (resolved by realpath).""" + link_path = os.path.join(self.output_dir, "evil_link") + try: + os.symlink("/etc", link_path) + with self.assertRaises(ValueError): + self.safe_output_path("evil_link/passwd") + finally: + if os.path.islink(link_path): + os.unlink(link_path) + + def test_output_mode_default_is_disabled(self): + """Default OUTPUT_MODE must be 'disabled'.""" + original = os.environ.pop("CRAWL4AI_OUTPUT_MODE", None) + try: + mode = os.environ.get("CRAWL4AI_OUTPUT_MODE", "disabled") + self.assertEqual(mode, "disabled") + finally: + if original is not None: + os.environ["CRAWL4AI_OUTPUT_MODE"] = original + + if __name__ == '__main__': print("=" * 60) print("Crawl4AI Security Fixes - Unit Tests")