diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index 8bbd7c6c..d7411ace 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -159,19 +159,25 @@ async fn route_request( (Method::POST, "/admin/keys/deactivate") => handle_deactivate_key(settings, req), // Unified auction endpoint (returns creative HTML inline) - (Method::POST, "/auction") => handle_auction(settings, orchestrator, req).await, + (Method::POST, "/auction") => { + handle_auction(settings, orchestrator, runtime_services, req).await + } // tsjs endpoints - (Method::GET, "/first-party/proxy") => handle_first_party_proxy(settings, req).await, - (Method::GET, "/first-party/click") => handle_first_party_click(settings, req).await, + (Method::GET, "/first-party/proxy") => { + handle_first_party_proxy(settings, runtime_services, req).await + } + (Method::GET, "/first-party/click") => { + handle_first_party_click(settings, runtime_services, req).await + } (Method::GET, "/first-party/sign") | (Method::POST, "/first-party/sign") => { - handle_first_party_proxy_sign(settings, req).await + handle_first_party_proxy_sign(settings, runtime_services, req).await } (Method::POST, "/first-party/proxy-rebuild") => { - handle_first_party_proxy_rebuild(settings, req).await + handle_first_party_proxy_rebuild(settings, runtime_services, req).await } (m, path) if integration_registry.has_route(&m, path) => integration_registry - .handle_proxy(&m, path, settings, req) + .handle_proxy(&m, path, settings, runtime_services, req) .await .unwrap_or_else(|| { Err(Report::new(TrustedServerError::BadRequest { diff --git a/crates/trusted-server-adapter-fastly/src/platform.rs b/crates/trusted-server-adapter-fastly/src/platform.rs index c7c5c3cc..b7feeb45 100644 --- a/crates/trusted-server-adapter-fastly/src/platform.rs +++ b/crates/trusted-server-adapter-fastly/src/platform.rs @@ -239,42 +239,175 @@ impl PlatformBackend for FastlyPlatformBackend { } // --------------------------------------------------------------------------- -// FastlyPlatformHttpClient +// FastlyPlatformHttpClient — helpers // --------------------------------------------------------------------------- -/// Placeholder Fastly implementation of [`PlatformHttpClient`]. +/// Convert a platform-neutral [`EdgeRequest`] to a [`fastly::Request`]. /// -/// The Fastly-backed `send` / `send_async` / `select` behavior lands in a -/// follow-up PR once the orchestrator migration is complete. Until then all -/// methods return [`PlatformError::NotImplemented`]. +/// Only `Body::Once` bodies are forwarded; `Body::Stream` bodies are not +/// used on this path (proxy.rs builds bodies from byte slices). +fn edge_request_to_fastly(request: edgezero_core::http::Request) -> fastly::Request { + let (parts, body) = request.into_parts(); + let mut fastly_req = fastly::Request::new(parts.method, parts.uri.to_string()); + for (name, value) in parts.headers.iter() { + fastly_req.set_header(name.as_str(), value.as_bytes()); + } + // Only Body::Once is supported. Body::Stream is intentionally not forwarded + // because all outbound proxy bodies are built from Vec via EdgeBody::from() + // and are therefore always Once. When this conversion moves to edgezero-adapter-fastly + // it can use send_async_streaming() to handle Stream bodies properly. + debug_assert!( + matches!(&body, edgezero_core::body::Body::Once(_)), + "unexpected Body::Stream in edge_request_to_fastly: body will be empty" + ); + if let edgezero_core::body::Body::Once(bytes) = body { + if !bytes.is_empty() { + fastly_req.set_body(bytes.to_vec()); + } + } else { + log::warn!("edge_request_to_fastly: Body::Stream not supported; body will be empty"); + } + fastly_req +} + +/// Convert a [`fastly::Response`] to a [`PlatformResponse`] with the given backend name. +fn fastly_response_to_platform( + mut resp: fastly::Response, + backend_name: impl Into, +) -> Result> { + let status = resp.get_status(); + let mut builder = edgezero_core::http::response_builder().status(status); + for (name, value) in resp.get_headers() { + builder = builder.header(name.as_str(), value.as_bytes()); + } + let body_bytes = resp.take_body_bytes(); + let edge_response = builder + .body(edgezero_core::body::Body::from(body_bytes)) + .change_context(PlatformError::HttpClient)?; + Ok(PlatformResponse::new(edge_response).with_backend_name(backend_name)) +} + +// --------------------------------------------------------------------------- +// FastlyPlatformHttpClient +// --------------------------------------------------------------------------- + +/// Fastly implementation of [`PlatformHttpClient`]. /// -/// Implementation lands in #487 (PR 6: Backend + HTTP client traits). +/// - [`send`](PlatformHttpClient::send) — converts the platform request to a +/// `fastly::Request`, calls `.send()`, and wraps the response. +/// - [`send_async`](PlatformHttpClient::send_async) — same conversion but +/// calls `.send_async()` and wraps the `fastly::PendingRequest`. +/// - [`select`](PlatformHttpClient::select) — downcasts each +/// [`PlatformPendingRequest`] back to `fastly::PendingRequest` and calls +/// `fastly::http::request::select()`. pub struct FastlyPlatformHttpClient; #[async_trait::async_trait(?Send)] impl PlatformHttpClient for FastlyPlatformHttpClient { async fn send( &self, - _request: PlatformHttpRequest, + request: PlatformHttpRequest, ) -> Result> { - Err(Report::new(PlatformError::NotImplemented) - .attach("FastlyPlatformHttpClient::send is not yet implemented")) + let backend_name = request.backend_name.clone(); + let fastly_req = edge_request_to_fastly(request.request); + let fastly_resp = fastly_req + .send(&backend_name) + .change_context(PlatformError::HttpClient)?; + fastly_response_to_platform(fastly_resp, backend_name) } async fn send_async( &self, - _request: PlatformHttpRequest, + request: PlatformHttpRequest, ) -> Result> { - Err(Report::new(PlatformError::NotImplemented) - .attach("FastlyPlatformHttpClient::send_async is not yet implemented")) + let backend_name = request.backend_name.clone(); + let fastly_req = edge_request_to_fastly(request.request); + let pending = fastly_req + .send_async(&backend_name) + .change_context(PlatformError::HttpClient)?; + Ok(PlatformPendingRequest::new(pending).with_backend_name(backend_name)) } async fn select( &self, - _pending_requests: Vec, + pending_requests: Vec, ) -> Result> { - Err(Report::new(PlatformError::NotImplemented) - .attach("FastlyPlatformHttpClient::select is not yet implemented")) + use fastly::http::request::{select, PendingRequest}; + + if pending_requests.is_empty() { + return Err(Report::new(PlatformError::HttpClient) + .attach("select called with an empty pending_requests list")); + } + + let mut fastly_pending: Vec = Vec::with_capacity(pending_requests.len()); + let mut saved_names: Vec = Vec::with_capacity(pending_requests.len()); + + for platform_req in pending_requests { + let name = platform_req.backend_name().unwrap_or("").to_string(); + let inner = platform_req.downcast::().map_err(|_| { + Report::new(PlatformError::HttpClient) + .attach("PlatformPendingRequest inner type is not fastly::PendingRequest") + })?; + fastly_pending.push(inner); + saved_names.push(name); + } + + let (result, remaining_fastly) = select(fastly_pending); + + // Re-attach saved backend names to the remaining pending requests. + // Identify which request completed by matching the response backend name + // to the saved names, then skip that index when rebuilding remaining. + let completed_name = match &result { + Ok(resp) => resp.get_backend_name().map(str::to_string), + Err(_) => None, + }; + let completed_idx = completed_name + .as_deref() + .and_then(|name| saved_names.iter().position(|n| n == name)); + if completed_name.is_some() && completed_idx.is_none() { + log::warn!( + "select: completed backend name not found in saved names; \ + remaining requests will lose backend correlation" + ); + } + + let remaining: Vec = if let Some(idx) = completed_idx { + remaining_fastly + .into_iter() + .zip( + saved_names + .into_iter() + .enumerate() + .filter(|(i, _)| *i != idx) + .map(|(_, name)| name), + ) + .map(|(req, name)| PlatformPendingRequest::new(req).with_backend_name(name)) + .collect() + } else { + remaining_fastly + .into_iter() + .map(PlatformPendingRequest::new) + .collect() + }; + + let ready = match result { + Ok(fastly_resp) => { + let backend_name = fastly_resp + .get_backend_name() + .unwrap_or_else(|| { + log::warn!("select: response has no backend name, correlation will fail"); + "" + }) + .to_string(); + fastly_response_to_platform(fastly_resp, backend_name) + } + Err(e) => { + Err(Report::new(PlatformError::HttpClient) + .attach(format!("fastly select error: {e}"))) + } + }; + + Ok(PlatformSelectResult { ready, remaining }) } } @@ -575,20 +708,73 @@ mod tests { ); } + // --- FastlyPlatformHttpClient ------------------------------------------- + #[test] - fn fastly_platform_http_client_reports_not_implemented() { + fn fastly_platform_http_client_send_returns_error_for_unregistered_backend() { let client = FastlyPlatformHttpClient; let request = request_builder() + .method("GET") .uri("https://example.com/") .body(Body::empty()) .expect("should build test request"); - let err = - futures::executor::block_on(client.send(PlatformHttpRequest::new(request, "origin"))) - .expect_err("should fail until the HTTP client is implemented"); + let err = futures::executor::block_on( + client.send(PlatformHttpRequest::new(request, "nonexistent-backend")), + ) + .expect_err("should return error for unregistered backend"); assert!( - matches!(err.current_context(), &PlatformError::NotImplemented), - "should report NotImplemented while the Fastly HTTP client is still a stub" + matches!(err.current_context(), &PlatformError::HttpClient), + "should be HttpClient error, got: {:?}", + err.current_context() + ); + } + + #[test] + fn fastly_platform_http_client_send_async_returns_error_for_unregistered_backend() { + let client = FastlyPlatformHttpClient; + let request = request_builder() + .method("GET") + .uri("https://example.com/") + .body(Body::empty()) + .expect("should build test request"); + let err = futures::executor::block_on( + client.send_async(PlatformHttpRequest::new(request, "nonexistent-backend")), + ) + .expect_err("should return error for unregistered backend"); + + assert!( + matches!(err.current_context(), &PlatformError::HttpClient), + "should be HttpClient error, got: {:?}", + err.current_context() + ); + } + + #[test] + fn fastly_platform_http_client_select_returns_error_for_empty_list() { + let client = FastlyPlatformHttpClient; + let err = futures::executor::block_on(client.select(vec![])) + .expect_err("should return error for empty pending list"); + + assert!( + matches!(err.current_context(), &PlatformError::HttpClient), + "should be HttpClient error, got: {:?}", + err.current_context() + ); + } + + #[test] + fn fastly_platform_http_client_select_returns_error_for_wrong_inner_type() { + let client = FastlyPlatformHttpClient; + // Wrap a non-PendingRequest type to trigger the downcast failure. + let wrong = PlatformPendingRequest::new(42u32); + let err = futures::executor::block_on(client.select(vec![wrong])) + .expect_err("should return error for wrong inner type"); + + assert!( + matches!(err.current_context(), &PlatformError::HttpClient), + "should be HttpClient error, got: {:?}", + err.current_context() ); } } diff --git a/crates/trusted-server-core/src/auction/endpoints.rs b/crates/trusted-server-core/src/auction/endpoints.rs index 0f9e6550..105c0d3e 100644 --- a/crates/trusted-server-core/src/auction/endpoints.rs +++ b/crates/trusted-server-core/src/auction/endpoints.rs @@ -8,6 +8,7 @@ use crate::consent; use crate::cookies::handle_request_cookies; use crate::error::TrustedServerError; use crate::geo::GeoInfo; +use crate::platform::RuntimeServices; use crate::settings::Settings; use crate::synthetic::get_or_generate_synthetic_id; @@ -31,6 +32,7 @@ use super::AuctionOrchestrator; pub async fn handle_auction( settings: &Settings, orchestrator: &AuctionOrchestrator, + services: &RuntimeServices, mut req: Request, ) -> Result> { // Parse request body @@ -79,7 +81,7 @@ pub async fn handle_auction( // Run the auction let result = orchestrator - .run_auction(&auction_request, &context) + .run_auction(&auction_request, &context, services) .await .change_context(TrustedServerError::Auction { message: "Auction orchestration failed".to_string(), diff --git a/crates/trusted-server-core/src/auction/orchestrator.rs b/crates/trusted-server-core/src/auction/orchestrator.rs index 0fc6ee9f..bc73480b 100644 --- a/crates/trusted-server-core/src/auction/orchestrator.rs +++ b/crates/trusted-server-core/src/auction/orchestrator.rs @@ -1,12 +1,13 @@ //! Auction orchestrator for managing multi-provider auctions. use error_stack::{Report, ResultExt}; -use fastly::http::request::{select, PendingRequest}; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use crate::error::TrustedServerError; +use crate::platform::{PlatformPendingRequest, RuntimeServices}; +use crate::proxy::platform_response_to_fastly; use super::config::AuctionConfig; use super::provider::AuctionProvider; @@ -65,6 +66,7 @@ impl AuctionOrchestrator { &self, request: &AuctionRequest, context: &AuctionContext<'_>, + services: &RuntimeServices, ) -> Result> { let start_time = Instant::now(); @@ -72,12 +74,13 @@ impl AuctionOrchestrator { let (strategy_name, result) = if self.config.has_mediator() { ( "parallel_mediation", - self.run_parallel_mediation(request, context).await?, + self.run_parallel_mediation(request, context, services) + .await?, ) } else { ( "parallel_only", - self.run_parallel_only(request, context).await?, + self.run_parallel_only(request, context, services).await?, ) }; @@ -102,9 +105,12 @@ impl AuctionOrchestrator { &self, request: &AuctionRequest, context: &AuctionContext<'_>, + services: &RuntimeServices, ) -> Result> { let mediation_start = Instant::now(); - let provider_responses = self.run_providers_parallel(request, context).await?; + let provider_responses = self + .run_providers_parallel(request, context, services) + .await?; let floor_prices = self.floor_prices_by_slot(request); let (mediator_response, winning_bids) = if let Some(mediator_name) = &self.config.mediator { @@ -150,9 +156,20 @@ impl AuctionOrchestrator { message: format!("Mediator {} failed to launch", mediator.provider_name()), })?; - let backend_response = pending.wait().change_context(TrustedServerError::Auction { - message: format!("Mediator {} request failed", mediator.provider_name()), - })?; + let select_result = services + .http_client() + .select(vec![PlatformPendingRequest::new(pending)]) + .await + .change_context(TrustedServerError::Auction { + message: format!("Mediator {} request failed", mediator.provider_name()), + })?; + let platform_resp = + select_result + .ready + .change_context(TrustedServerError::Auction { + message: format!("Mediator {} request failed", mediator.provider_name()), + })?; + let backend_response = platform_response_to_fastly(platform_resp); let response_time_ms = start_time.elapsed().as_millis() as u64; let mediator_resp = mediator @@ -205,8 +222,11 @@ impl AuctionOrchestrator { &self, request: &AuctionRequest, context: &AuctionContext<'_>, + services: &RuntimeServices, ) -> Result> { - let provider_responses = self.run_providers_parallel(request, context).await?; + let provider_responses = self + .run_providers_parallel(request, context, services) + .await?; let floor_prices = self.floor_prices_by_slot(request); let winning_bids = self.select_winning_bids(&provider_responses, &floor_prices); @@ -227,6 +247,7 @@ impl AuctionOrchestrator { &self, request: &AuctionRequest, context: &AuctionContext<'_>, + services: &RuntimeServices, ) -> Result, Report> { let provider_names = self.config.provider_names(); @@ -248,7 +269,7 @@ impl AuctionOrchestrator { // Maps backend_name -> (provider_name, start_time, provider) let mut backend_to_provider: HashMap = HashMap::new(); - let mut pending_requests: Vec = Vec::new(); + let mut pending_requests: Vec = Vec::new(); for provider_name in provider_names { let provider = match self.providers.get(provider_name) { @@ -315,10 +336,11 @@ impl AuctionOrchestrator { match provider.request_bids(request, &provider_context) { Ok(pending) => { backend_to_provider.insert( - backend_name, + backend_name.clone(), (provider.provider_name(), start_time, provider.as_ref()), ); - pending_requests.push(pending); + pending_requests + .push(PlatformPendingRequest::new(pending).with_backend_name(backend_name)); log::debug!( "Request to '{}' launched successfully", provider.provider_name() @@ -353,13 +375,20 @@ impl AuctionOrchestrator { let mut remaining = pending_requests; while !remaining.is_empty() { - let (result, rest) = select(remaining); - remaining = rest; + let select_result = services + .http_client() + .select(remaining) + .await + .change_context(TrustedServerError::Auction { + message: "HTTP select failed".to_string(), + })?; + remaining = select_result.remaining; - match result { - Ok(response) => { + match select_result.ready { + Ok(platform_response) => { // Identify the provider from the backend name - let backend_name = response.get_backend_name().unwrap_or_default().to_string(); + let backend_name = platform_response.backend_name.clone().unwrap_or_default(); + let response = platform_response_to_fastly(platform_response); if let Some((provider_name, start_time, provider)) = backend_to_provider.remove(&backend_name) @@ -587,6 +616,7 @@ mod tests { use crate::auction::types::{ AdFormat, AdSlot, AuctionContext, AuctionRequest, Bid, MediaType, PublisherInfo, UserInfo, }; + use crate::platform::test_support::noop_services; use crate::test_support::tests::crate_test_settings_str; use fastly::Request; use std::collections::{HashMap, HashSet}; @@ -741,7 +771,9 @@ mod tests { let req = Request::get("https://test.com/test"); let context = create_test_context(&settings, &req); - let result = orchestrator.run_auction(&request, &context).await; + let result = orchestrator + .run_auction(&request, &context, &noop_services()) + .await; assert!(result.is_err()); let err = result.unwrap_err(); diff --git a/crates/trusted-server-core/src/integrations/datadome.rs b/crates/trusted-server-core/src/integrations/datadome.rs index da3a64a2..0ce83b2f 100644 --- a/crates/trusted-server-core/src/integrations/datadome.rs +++ b/crates/trusted-server-core/src/integrations/datadome.rs @@ -71,6 +71,7 @@ use crate::integrations::{ AttributeRewriteAction, IntegrationAttributeContext, IntegrationAttributeRewriter, IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, }; +use crate::platform::RuntimeServices; use crate::settings::{IntegrationConfig, Settings}; const DATADOME_INTEGRATION_ID: &str = "datadome"; @@ -404,6 +405,7 @@ impl IntegrationProxy for DataDomeIntegration { async fn handle( &self, _settings: &Settings, + _services: &RuntimeServices, req: Request, ) -> Result> { let path = req.get_path(); diff --git a/crates/trusted-server-core/src/integrations/didomi.rs b/crates/trusted-server-core/src/integrations/didomi.rs index 7042af70..2cada38f 100644 --- a/crates/trusted-server-core/src/integrations/didomi.rs +++ b/crates/trusted-server-core/src/integrations/didomi.rs @@ -11,6 +11,7 @@ use validator::Validate; use crate::backend::BackendConfig; use crate::error::TrustedServerError; use crate::integrations::{IntegrationEndpoint, IntegrationProxy, IntegrationRegistration}; +use crate::platform::RuntimeServices; use crate::settings::{IntegrationConfig, Settings}; const DIDOMI_INTEGRATION_ID: &str = "didomi"; @@ -198,6 +199,7 @@ impl IntegrationProxy for DidomiIntegration { async fn handle( &self, _settings: &Settings, + _services: &RuntimeServices, req: Request, ) -> Result> { let path = req.get_path(); diff --git a/crates/trusted-server-core/src/integrations/google_tag_manager.rs b/crates/trusted-server-core/src/integrations/google_tag_manager.rs index 646f720f..64c41ec3 100644 --- a/crates/trusted-server-core/src/integrations/google_tag_manager.rs +++ b/crates/trusted-server-core/src/integrations/google_tag_manager.rs @@ -28,6 +28,7 @@ use crate::integrations::{ IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, IntegrationScriptContext, IntegrationScriptRewriter, ScriptRewriteAction, }; +use crate::platform::RuntimeServices; use crate::proxy::{proxy_request, ProxyRequestConfig}; use crate::settings::{IntegrationConfig, Settings}; @@ -372,6 +373,7 @@ impl IntegrationProxy for GoogleTagManagerIntegration { async fn handle( &self, settings: &Settings, + services: &RuntimeServices, mut req: Request, ) -> Result> { let path = req.get_path().to_string(); @@ -427,7 +429,7 @@ impl IntegrationProxy for GoogleTagManagerIntegration { } }; - let mut response = proxy_request(settings, req, proxy_config) + let mut response = proxy_request(settings, req, proxy_config, services) .await .change_context(Self::error("Failed to proxy GTM request"))?; @@ -514,6 +516,7 @@ mod tests { use crate::settings::Settings; use crate::streaming_processor::{Compression, PipelineConfig, StreamingPipeline}; + use crate::platform::test_support::noop_services; use crate::test_support::tests::crate_test_settings_str; use fastly::http::Method; use std::io::Cursor; @@ -1113,7 +1116,7 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= let settings = make_settings(); let response = integration - .handle(&settings, req) + .handle(&settings, &noop_services(), req) .await .expect("handle should not return error"); @@ -1148,7 +1151,7 @@ j=d.createElement(s),dl=l!='dataLayer'?'&l='+l:'';j.async=true;j.src= let settings = make_settings(); let response = integration - .handle(&settings, req) + .handle(&settings, &noop_services(), req) .await .expect("handle should not return error"); diff --git a/crates/trusted-server-core/src/integrations/gpt.rs b/crates/trusted-server-core/src/integrations/gpt.rs index ed8562b5..1a68962e 100644 --- a/crates/trusted-server-core/src/integrations/gpt.rs +++ b/crates/trusted-server-core/src/integrations/gpt.rs @@ -49,6 +49,7 @@ use crate::integrations::{ IntegrationEndpoint, IntegrationHeadInjector, IntegrationHtmlContext, IntegrationProxy, IntegrationRegistration, }; +use crate::platform::RuntimeServices; use crate::proxy::{proxy_request, ProxyRequestConfig}; use crate::settings::{IntegrationConfig, Settings}; @@ -238,12 +239,13 @@ impl GptIntegration { async fn proxy_gpt_asset( &self, settings: &Settings, + services: &RuntimeServices, req: Request, target_url: &str, context: &str, ) -> Result> { let config = Self::build_proxy_config(target_url, &req); - let response = proxy_request(settings, req, config) + let response = proxy_request(settings, req, config, services) .await .change_context(Self::error(context))?; @@ -287,12 +289,14 @@ impl GptIntegration { async fn handle_script_serving( &self, settings: &Settings, + services: &RuntimeServices, req: Request, ) -> Result> { let script_url = &self.config.script_url; log::info!("Fetching GPT script from: {}", script_url); self.proxy_gpt_asset( settings, + services, req, script_url, &format!("Failed to fetch GPT script from {script_url}"), @@ -309,6 +313,7 @@ impl GptIntegration { async fn handle_pagead_proxy( &self, settings: &Settings, + services: &RuntimeServices, req: Request, ) -> Result> { let original_path = req.get_path(); @@ -320,6 +325,7 @@ impl GptIntegration { log::info!("GPT proxy: forwarding to {}", target_url); self.proxy_gpt_asset( settings, + services, req, &target_url, &format!("Failed to fetch GPT resource from {target_url}"), @@ -376,16 +382,17 @@ impl IntegrationProxy for GptIntegration { async fn handle( &self, settings: &Settings, + services: &RuntimeServices, req: Request, ) -> Result> { let path = req.get_path(); if path == "/integrations/gpt/script" { - self.handle_script_serving(settings, req).await + self.handle_script_serving(settings, services, req).await } else if path.starts_with("/integrations/gpt/pagead/") || path.starts_with("/integrations/gpt/tag/") { - self.handle_pagead_proxy(settings, req).await + self.handle_pagead_proxy(settings, services, req).await } else { Err(Report::new(Self::error(format!( "Unknown GPT route: {}", diff --git a/crates/trusted-server-core/src/integrations/lockr.rs b/crates/trusted-server-core/src/integrations/lockr.rs index 5469c294..ad009130 100644 --- a/crates/trusted-server-core/src/integrations/lockr.rs +++ b/crates/trusted-server-core/src/integrations/lockr.rs @@ -24,6 +24,7 @@ use crate::integrations::{ AttributeRewriteAction, IntegrationAttributeContext, IntegrationAttributeRewriter, IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, }; +use crate::platform::RuntimeServices; use crate::settings::{IntegrationConfig, Settings}; const LOCKR_INTEGRATION_ID: &str = "lockr"; @@ -304,6 +305,7 @@ impl IntegrationProxy for LockrIntegration { async fn handle( &self, settings: &Settings, + _services: &RuntimeServices, req: Request, ) -> Result> { let path = req.get_path(); diff --git a/crates/trusted-server-core/src/integrations/permutive.rs b/crates/trusted-server-core/src/integrations/permutive.rs index 179d7766..0946a54a 100644 --- a/crates/trusted-server-core/src/integrations/permutive.rs +++ b/crates/trusted-server-core/src/integrations/permutive.rs @@ -19,6 +19,7 @@ use crate::integrations::{ AttributeRewriteAction, IntegrationAttributeContext, IntegrationAttributeRewriter, IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, }; +use crate::platform::RuntimeServices; use crate::settings::{IntegrationConfig, Settings}; const PERMUTIVE_INTEGRATION_ID: &str = "permutive"; @@ -560,6 +561,7 @@ impl IntegrationProxy for PermutiveIntegration { async fn handle( &self, settings: &Settings, + _services: &RuntimeServices, req: Request, ) -> Result> { let path = req.get_path(); diff --git a/crates/trusted-server-core/src/integrations/prebid.rs b/crates/trusted-server-core/src/integrations/prebid.rs index f11ad33b..0dacaee2 100644 --- a/crates/trusted-server-core/src/integrations/prebid.rs +++ b/crates/trusted-server-core/src/integrations/prebid.rs @@ -29,6 +29,7 @@ use crate::openrtb::{ OpenRtbRequest, PrebidExt, PrebidImpExt, Publisher, Regs, RegsExt, RequestExt, Site, ToExt, TrustedServerExt, User, UserExt, }; +use crate::platform::RuntimeServices; use crate::request_signing::{RequestSigner, SigningParams, SIGNING_VERSION}; use crate::settings::{IntegrationConfig, Settings}; @@ -314,6 +315,7 @@ impl IntegrationProxy for PrebidIntegration { async fn handle( &self, _settings: &Settings, + _services: &RuntimeServices, req: Request, ) -> Result> { let path = req.get_path().to_string(); diff --git a/crates/trusted-server-core/src/integrations/registry.rs b/crates/trusted-server-core/src/integrations/registry.rs index 6df46dd1..8cfdd2f2 100644 --- a/crates/trusted-server-core/src/integrations/registry.rs +++ b/crates/trusted-server-core/src/integrations/registry.rs @@ -11,6 +11,7 @@ use matchit::Router; use crate::constants::HEADER_X_SYNTHETIC_ID; use crate::cookies::set_synthetic_cookie; use crate::error::TrustedServerError; +use crate::platform::RuntimeServices; use crate::settings::Settings; use crate::synthetic::get_or_generate_synthetic_id; @@ -258,6 +259,7 @@ pub trait IntegrationProxy: Send + Sync { async fn handle( &self, settings: &Settings, + services: &RuntimeServices, req: Request, ) -> Result>; @@ -652,6 +654,7 @@ impl IntegrationRegistry { method: &Method, path: &str, settings: &Settings, + services: &RuntimeServices, mut req: Request, ) -> Option>> { if let Some((proxy, _)) = self.find_route(method, path) { @@ -665,7 +668,7 @@ impl IntegrationRegistry { req.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id.as_str()); } - let mut result = proxy.handle(settings, req).await; + let mut result = proxy.handle(settings, services, req).await; // Set synthetic ID header on successful responses if let Ok(ref mut response) = result { @@ -943,6 +946,7 @@ impl IntegrationRegistry { #[cfg(test)] mod tests { use super::*; + use crate::platform::test_support::noop_services; // Mock integration proxy for testing struct MockProxy; @@ -960,6 +964,7 @@ mod tests { async fn handle( &self, _settings: &Settings, + _services: &RuntimeServices, _req: Request, ) -> Result> { Ok(Response::new()) @@ -1243,6 +1248,7 @@ mod tests { async fn handle( &self, _settings: &Settings, + _services: &RuntimeServices, _req: Request, ) -> Result> { // Return a simple response without the synthetic ID header. @@ -1272,6 +1278,7 @@ mod tests { &Method::GET, "/integrations/test/synthetic", &settings, + &noop_services(), req, )); @@ -1323,6 +1330,7 @@ mod tests { &Method::GET, "/integrations/test/synthetic", &settings, + &noop_services(), req, )) .expect("should handle proxy request"); @@ -1383,6 +1391,7 @@ mod tests { &Method::GET, "/integrations/test/synthetic", &settings, + &noop_services(), req, )) .expect("should handle proxy request"); @@ -1433,6 +1442,7 @@ mod tests { &Method::POST, "/integrations/test/synthetic", &settings, + &noop_services(), req, )); diff --git a/crates/trusted-server-core/src/integrations/testlight.rs b/crates/trusted-server-core/src/integrations/testlight.rs index 91c1cc75..e7385464 100644 --- a/crates/trusted-server-core/src/integrations/testlight.rs +++ b/crates/trusted-server-core/src/integrations/testlight.rs @@ -13,6 +13,7 @@ use crate::integrations::{ AttributeRewriteAction, IntegrationAttributeContext, IntegrationAttributeRewriter, IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, }; +use crate::platform::RuntimeServices; use crate::proxy::{proxy_request, ProxyRequestConfig}; use crate::settings::{IntegrationConfig, Settings}; use crate::synthetic::get_synthetic_id; @@ -140,6 +141,7 @@ impl IntegrationProxy for TestlightIntegration { async fn handle( &self, settings: &Settings, + services: &RuntimeServices, mut req: Request, ) -> Result> { let mut payload = serde_json::from_slice::(&req.take_body_bytes()) @@ -172,7 +174,7 @@ impl IntegrationProxy for TestlightIntegration { HeaderValue::from_static("application/json"), )); - let mut response = proxy_request(settings, req, proxy_config) + let mut response = proxy_request(settings, req, proxy_config, services) .await .change_context(Self::error("Failed to contact upstream integration"))?; diff --git a/crates/trusted-server-core/src/platform/test_support.rs b/crates/trusted-server-core/src/platform/test_support.rs index 3bdb6b2b..a64fab96 100644 --- a/crates/trusted-server-core/src/platform/test_support.rs +++ b/crates/trusted-server-core/src/platform/test_support.rs @@ -1,7 +1,8 @@ +use std::collections::VecDeque; use std::net::IpAddr; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; -use error_stack::Report; +use error_stack::{Report, ResultExt}; use super::{ ClientInfo, GeoInfo, PlatformBackend, PlatformBackendSpec, PlatformConfigStore, PlatformError, @@ -95,6 +96,159 @@ impl PlatformHttpClient for NoopHttpClient { } } +// --------------------------------------------------------------------------- +// StubBackend +// --------------------------------------------------------------------------- + +/// Test stub for [`PlatformBackend`] that returns `"stub-backend"` for any +/// spec, allowing callers to proceed past backend registration. +pub(crate) struct StubBackend; + +impl PlatformBackend for StubBackend { + fn predict_name(&self, _spec: &PlatformBackendSpec) -> Result> { + Ok("stub-backend".to_string()) + } + + fn ensure(&self, _spec: &PlatformBackendSpec) -> Result> { + Ok("stub-backend".to_string()) + } +} + +// --------------------------------------------------------------------------- +// StubHttpClient +// --------------------------------------------------------------------------- + +/// Canned response carried by a [`StubPendingResponse`] through `send_async` +/// and resolved by [`StubHttpClient::select`]. +struct StubPendingResponse { + backend_name: String, + status: u16, + body: Vec, +} + +/// Test stub for [`PlatformHttpClient`] that records call backend names and +/// returns pre-queued canned responses for `send`, `send_async`, and `select`. +/// +/// Responses are stored as `(status_code, body_bytes)` to remain [`Send`]. +/// [`PlatformResponse`] contains [`edgezero_core::body::Body`] which wraps a +/// `LocalBoxStream` that is `!Send`, so it cannot be stored directly in a +/// `Mutex` field. +/// +/// Use [`push_response`](Self::push_response) to enqueue responses before +/// exercising the code under test, then inspect +/// [`recorded_backend_names`](Self::recorded_backend_names) to assert call +/// sites. +pub(crate) struct StubHttpClient { + calls: Mutex>, + // (status_code, body_bytes) — kept Send by avoiding Body::Stream + responses: Mutex)>>, +} + +impl StubHttpClient { + pub fn new() -> Self { + Self { + calls: Mutex::new(Vec::new()), + responses: Mutex::new(VecDeque::new()), + } + } + + /// Queue a canned response by status code and body bytes. + pub fn push_response(&self, status: u16, body: Vec) { + self.responses + .lock() + .expect("should lock responses") + .push_back((status, body)); + } + + /// Return backend names recorded across all `send` calls, in order. + pub fn recorded_backend_names(&self) -> Vec { + self.calls.lock().expect("should lock calls").clone() + } +} + +// ?Send matches PlatformHttpClient. See http.rs for the full rationale. +#[async_trait::async_trait(?Send)] +impl PlatformHttpClient for StubHttpClient { + async fn send( + &self, + request: PlatformHttpRequest, + ) -> Result> { + self.calls + .lock() + .expect("should lock calls") + .push(request.backend_name.clone()); + + let (status, body_bytes) = self + .responses + .lock() + .expect("should lock responses") + .pop_front() + .ok_or_else(|| Report::new(PlatformError::HttpClient))?; + + let edge_response = edgezero_core::http::response_builder() + .status(status) + .body(edgezero_core::body::Body::from(body_bytes)) + .change_context(PlatformError::HttpClient)?; + + Ok(PlatformResponse::new(edge_response)) + } + + async fn send_async( + &self, + request: PlatformHttpRequest, + ) -> Result> { + let backend_name = request.backend_name.clone(); + self.calls + .lock() + .expect("should lock calls") + .push(backend_name.clone()); + + let (status, body_bytes) = self + .responses + .lock() + .expect("should lock responses") + .pop_front() + .ok_or_else(|| Report::new(PlatformError::HttpClient))?; + + let pending = StubPendingResponse { + backend_name: backend_name.clone(), + status, + body: body_bytes, + }; + Ok(PlatformPendingRequest::new(pending).with_backend_name(backend_name)) + } + + async fn select( + &self, + mut pending_requests: Vec, + ) -> Result> { + if pending_requests.is_empty() { + return Err(Report::new(PlatformError::HttpClient) + .attach("select called with empty pending_requests list")); + } + + let ready_platform = pending_requests.remove(0); + let stub = ready_platform + .downcast::() + .map_err(|_| { + Report::new(PlatformError::HttpClient) + .attach("unexpected inner type in StubHttpClient::select") + })?; + + let edge_response = edgezero_core::http::response_builder() + .status(stub.status) + .body(edgezero_core::body::Body::from(stub.body)) + .change_context(PlatformError::HttpClient)?; + + let ready = Ok(PlatformResponse::new(edge_response).with_backend_name(stub.backend_name)); + + Ok(PlatformSelectResult { + ready, + remaining: pending_requests, + }) + } +} + pub(crate) struct NoopGeo; impl PlatformGeo for NoopGeo { @@ -124,3 +278,168 @@ pub(crate) fn build_services_with_config( pub(crate) fn noop_services() -> RuntimeServices { build_services_with_config(NoopConfigStore) } + +/// Build a [`RuntimeServices`] with a [`StubBackend`] and the given HTTP client. +/// +/// Useful for tests that need to verify `services.http_client()` call sites. +pub(crate) fn build_services_with_http_client( + http_client: Arc, +) -> RuntimeServices { + RuntimeServices::builder() + .config_store(Arc::new(NoopConfigStore)) + .secret_store(Arc::new(NoopSecretStore)) + .kv_store(Arc::new(edgezero_core::key_value_store::NoopKvStore)) + .backend(Arc::new(StubBackend)) + .http_client(http_client) + .geo(Arc::new(NoopGeo)) + .client_info(ClientInfo { + client_ip: None, + tls_protocol: None, + tls_cipher: None, + }) + .build() +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use edgezero_core::body::Body; + use edgezero_core::http::request_builder; + + use super::*; + + #[test] + fn stub_http_client_records_send_calls_and_returns_canned_response() { + let stub = StubHttpClient::new(); + stub.push_response(200, b"hello".to_vec()); + + let req = PlatformHttpRequest::new( + request_builder() + .method("GET") + .uri("https://example.com/test") + .body(Body::empty()) + .expect("should build request"), + "stub-backend", + ); + let result = futures::executor::block_on(stub.send(req)); + + assert!(result.is_ok(), "should return canned response"); + let names = stub.recorded_backend_names(); + assert_eq!( + names, + vec!["stub-backend"], + "should record the backend name" + ); + } + + #[test] + fn stub_http_client_returns_error_when_no_response_queued() { + let stub = StubHttpClient::new(); + + let req = PlatformHttpRequest::new( + request_builder() + .method("GET") + .uri("https://example.com/") + .body(Body::empty()) + .expect("should build request"), + "stub-backend", + ); + let result = futures::executor::block_on(stub.send(req)); + + assert!(result.is_err(), "should return error when queue is empty"); + assert!( + matches!( + result.unwrap_err().current_context(), + PlatformError::HttpClient + ), + "should be HttpClient error" + ); + } + + #[test] + fn stub_http_client_send_async_and_select_fan_out() { + let stub = StubHttpClient::new(); + stub.push_response(200, b"provider-a".to_vec()); + stub.push_response(201, b"provider-b".to_vec()); + + let make_req = |backend: &str| { + PlatformHttpRequest::new( + request_builder() + .method("GET") + .uri("https://example.com/bid") + .body(Body::empty()) + .expect("should build request"), + backend, + ) + }; + + let pending_a = futures::executor::block_on(stub.send_async(make_req("backend-a"))) + .expect("should start request a"); + let pending_b = futures::executor::block_on(stub.send_async(make_req("backend-b"))) + .expect("should start request b"); + + assert_eq!( + pending_a.backend_name(), + Some("backend-a"), + "should attach backend name to pending request a" + ); + assert_eq!( + pending_b.backend_name(), + Some("backend-b"), + "should attach backend name to pending request b" + ); + + let result = futures::executor::block_on(stub.select(vec![pending_a, pending_b])) + .expect("should select first ready request"); + + let ready_resp = result.ready.expect("should have a ready response"); + assert_eq!( + ready_resp.backend_name.as_deref(), + Some("backend-a"), + "should correlate ready response to backend-a" + ); + assert_eq!( + result.remaining.len(), + 1, + "should have one remaining request" + ); + assert_eq!( + result.remaining[0].backend_name(), + Some("backend-b"), + "should preserve backend name on remaining request" + ); + + let names = stub.recorded_backend_names(); + assert_eq!( + names, + vec!["backend-a", "backend-b"], + "should record both send_async calls in order" + ); + } + + #[test] + fn stub_http_client_select_returns_error_when_empty() { + let stub = StubHttpClient::new(); + let err = futures::executor::block_on(stub.select(vec![])) + .expect_err("should return error for empty list"); + assert!( + matches!(err.current_context(), PlatformError::HttpClient), + "should be HttpClient error" + ); + } + + #[test] + fn stub_backend_returns_fixed_name() { + let stub = StubBackend; + let spec = PlatformBackendSpec { + scheme: "https".to_string(), + host: "example.com".to_string(), + port: None, + certificate_check: true, + first_byte_timeout: Duration::from_secs(15), + }; + let name = stub.ensure(&spec).expect("should return a backend name"); + assert_eq!(name, "stub-backend", "should return fixed name"); + } +} diff --git a/crates/trusted-server-core/src/proxy.rs b/crates/trusted-server-core/src/proxy.rs index 01257c08..02512398 100644 --- a/crates/trusted-server-core/src/proxy.rs +++ b/crates/trusted-server-core/src/proxy.rs @@ -1,4 +1,6 @@ use crate::http_util::compute_encrypted_sha256_token; +use edgezero_core::body::Body as EdgeBody; +use edgezero_core::http::{request_builder as edge_request_builder, Uri as EdgeUri}; use error_stack::{Report, ResultExt}; use fastly::http::{header, HeaderValue, Method, StatusCode}; use fastly::{Request, Response}; @@ -11,6 +13,9 @@ use crate::constants::{ }; use crate::creative::{CreativeCssProcessor, CreativeHtmlProcessor}; use crate::error::TrustedServerError; +use crate::platform::{ + PlatformBackendSpec, PlatformHttpRequest, PlatformResponse, RuntimeServices, +}; use crate::settings::Settings; use crate::streaming_processor::{Compression, PipelineConfig, StreamProcessor, StreamingPipeline}; use crate::synthetic::get_synthetic_id; @@ -18,6 +23,46 @@ use crate::synthetic::get_synthetic_id; /// Chunk size used for streaming content through the rewrite pipeline. const STREAMING_CHUNK_SIZE: usize = 8192; +/// Headers copied from the original client request to the upstream proxy request. +const PROXY_FORWARD_HEADERS: [header::HeaderName; 5] = [ + HEADER_USER_AGENT, + HEADER_ACCEPT, + HEADER_ACCEPT_LANGUAGE, + HEADER_REFERER, + HEADER_X_FORWARDED_FOR, +]; + +/// Convert a platform-neutral response into a [`fastly::Response`] for downstream processing. +/// +/// Shared with `auction/orchestrator.rs`. Both files will migrate off `fastly::Response` +/// entirely in Phase 2, at which point this conversion helper will be removed. +/// +/// # Panics (debug builds only) +/// +/// Panics when `platform_resp` carries a `Body::Stream` body, which indicates a +/// programming error — all outbound proxy bodies are built from byte slices and +/// are therefore always `Body::Once`. +pub(crate) fn platform_response_to_fastly(platform_resp: PlatformResponse) -> Response { + let (parts, body) = platform_resp.response.into_parts(); + debug_assert!( + matches!(&body, EdgeBody::Once(_)), + "unexpected Body::Stream in platform response conversion: body will be empty" + ); + let body_bytes = match body { + EdgeBody::Once(bytes) => bytes.to_vec(), + EdgeBody::Stream(_) => { + log::warn!("streaming platform response body; body will be empty"); + vec![] + } + }; + let mut resp = Response::from_status(parts.status.as_u16()); + for (name, value) in parts.headers.iter() { + resp.set_header(name.as_str(), value.as_bytes()); + } + resp.set_body(body_bytes); + resp +} + #[derive(Deserialize)] struct ProxySignReq { url: String, @@ -106,23 +151,6 @@ impl<'a> ProxyRequestConfig<'a> { /// We override the client's Accept-Encoding to only advertise these. const SUPPORTED_ENCODINGS: &str = "gzip, deflate, br"; -/// Copy a curated set of request headers to a proxied request. -fn copy_proxy_forward_headers(src: &Request, dst: &mut Request) { - for header_name in [ - HEADER_USER_AGENT, - HEADER_ACCEPT, - HEADER_ACCEPT_LANGUAGE, - HEADER_REFERER, - HEADER_X_FORWARDED_FOR, - ] { - if let Some(v) = src.get_header(&header_name) { - dst.set_header(&header_name, v); - } - } - // Only advertise encodings we can decompress (excludes zstd, etc.) - dst.set_header(HEADER_ACCEPT_ENCODING, SUPPORTED_ENCODINGS); -} - /// Rebuild a response with a new body, preserving headers except Content-Length. /// If `preserve_encoding` is true, the Content-Encoding header is kept (for compressed responses). /// If false, Content-Encoding is stripped (for decompressed responses). @@ -400,7 +428,7 @@ fn finalize_response( struct ProxyRequestHeaders<'a> { additional_headers: &'a [(header::HeaderName, HeaderValue)], copy_request_headers: bool, - allowed_domains: &'a [String], + services: &'a RuntimeServices, } /// Proxy a request to a clear target URL while reusing creative rewrite logic. @@ -417,6 +445,7 @@ pub async fn proxy_request( settings: &Settings, req: Request, config: ProxyRequestConfig<'_>, + services: &RuntimeServices, ) -> Result> { let ProxyRequestConfig { target_url, @@ -426,7 +455,7 @@ pub async fn proxy_request( headers, copy_request_headers, stream_passthrough, - allowed_domains, + allowed_domains: _, } = config; let mut target_url_parsed = url::Url::parse(target_url).map_err(|_| { @@ -448,7 +477,7 @@ pub async fn proxy_request( ProxyRequestHeaders { additional_headers: &headers, copy_request_headers, - allowed_domains, + services, }, stream_passthrough, ) @@ -559,7 +588,7 @@ async fn proxy_with_redirects( })); } - if !redirect_is_permitted(request_headers.allowed_domains, host) { + if !redirect_is_permitted(&settings.proxy.allowed_domains, host) { log::warn!( "request to `{}` blocked: host not in proxy allowed_domains", host @@ -569,29 +598,61 @@ async fn proxy_with_redirects( })); } - let backend_name = crate::backend::BackendConfig::new(&scheme, host) - .port(parsed_url.port()) - .certificate_check(settings.proxy.certificate_check) - .ensure()?; + let backend_name = request_headers + .services + .backend() + .ensure(&PlatformBackendSpec { + scheme: scheme.clone(), + host: host.to_string(), + port: parsed_url.port(), + certificate_check: settings.proxy.certificate_check, + first_byte_timeout: Duration::from_secs(15), + }) + .change_context(TrustedServerError::Proxy { + message: "backend registration failed".to_string(), + })?; + + let mut builder = edge_request_builder().method(current_method.clone()).uri( + current_url + .parse::() + .change_context(TrustedServerError::Proxy { + message: "invalid url".to_string(), + })?, + ); - let mut proxy_req = Request::new(current_method.clone(), ¤t_url); if request_headers.copy_request_headers { - copy_proxy_forward_headers(req, &mut proxy_req); - } - if let Some(body_bytes) = body { - proxy_req.set_body(body_bytes.to_vec()); + for header_name in PROXY_FORWARD_HEADERS { + if let Some(v) = req.get_header(&header_name) { + builder = builder.header(header_name.as_str(), v.as_bytes()); + } + } + builder = builder.header( + HEADER_ACCEPT_ENCODING.as_str(), + SUPPORTED_ENCODINGS.as_bytes(), + ); } - for (name, value) in request_headers.additional_headers { - proxy_req.set_header(name.clone(), value.clone()); + builder = builder.header(name.clone(), value.clone()); } - - let beresp = proxy_req - .send(&backend_name) + let body_bytes = body.map(<[u8]>::to_vec).unwrap_or_default(); + let edge_req = + builder + .body(EdgeBody::from(body_bytes)) + .change_context(TrustedServerError::Proxy { + message: "failed to build proxy request".to_string(), + })?; + + let platform_resp = request_headers + .services + .http_client() + .send(PlatformHttpRequest::new(edge_req, backend_name)) + .await .change_context(TrustedServerError::Proxy { message: "Failed to proxy".to_string(), })?; + let beresp = platform_response_to_fastly(platform_resp); + if !follow_redirects { return finalize_response(settings, req, ¤t_url, beresp, stream_passthrough); } @@ -647,7 +708,7 @@ async fn proxy_with_redirects( })); } }; - if !redirect_is_permitted(request_headers.allowed_domains, next_host) { + if !redirect_is_permitted(&settings.proxy.allowed_domains, next_host) { log::warn!( "redirect to `{}` blocked: host not in proxy allowed_domains", next_host @@ -693,6 +754,7 @@ async fn proxy_with_redirects( /// Returns an error if the signed target cannot be reconstructed or validation fails. pub async fn handle_first_party_proxy( settings: &Settings, + services: &RuntimeServices, req: Request, ) -> Result> { // Parse, reconstruct, and validate the signed target URL @@ -712,6 +774,7 @@ pub async fn handle_first_party_proxy( stream_passthrough: false, allowed_domains: &settings.proxy.allowed_domains, }, + services, ) .await } @@ -728,6 +791,7 @@ pub async fn handle_first_party_proxy( /// Returns an error if the signed target cannot be reconstructed or validation fails. pub async fn handle_first_party_click( settings: &Settings, + _services: &RuntimeServices, req: Request, ) -> Result> { let SignedTarget { @@ -814,6 +878,7 @@ pub async fn handle_first_party_click( /// Returns an error if JSON parsing fails, the URL cannot be parsed, or the URL uses an unsupported scheme. pub async fn handle_first_party_proxy_sign( settings: &Settings, + _services: &RuntimeServices, mut req: Request, ) -> Result> { let method = req.get_method().clone(); @@ -921,6 +986,7 @@ struct ProxyRebuildResp { /// Returns an error if JSON parsing fails, the URL is invalid, or the request body cannot be read. pub async fn handle_first_party_proxy_rebuild( settings: &Settings, + _services: &RuntimeServices, mut req: Request, ) -> Result> { let method = req.get_method().clone(); @@ -1181,20 +1247,15 @@ fn reconstruct_and_validate_signed_target( #[cfg(test)] mod tests { use super::{ - copy_proxy_forward_headers, handle_first_party_click, handle_first_party_proxy, - handle_first_party_proxy_rebuild, handle_first_party_proxy_sign, is_host_allowed, + handle_first_party_click, handle_first_party_proxy, handle_first_party_proxy_rebuild, + handle_first_party_proxy_sign, is_host_allowed, proxy_request, reconstruct_and_validate_signed_target, redirect_is_permitted, ProxyRequestConfig, - SUPPORTED_ENCODINGS, }; + use crate::constants::HEADER_ACCEPT; + use crate::creative; use crate::error::{IntoHttpResponse, TrustedServerError}; + use crate::platform::test_support::noop_services; use crate::test_support::tests::create_test_settings; - use crate::{ - constants::{ - HEADER_ACCEPT, HEADER_ACCEPT_ENCODING, HEADER_ACCEPT_LANGUAGE, HEADER_REFERER, - HEADER_USER_AGENT, HEADER_X_FORWARDED_FOR, - }, - creative, - }; use error_stack::Report; use fastly::http::{header, HeaderValue, Method, StatusCode}; use fastly::{Request, Response}; @@ -1203,9 +1264,10 @@ mod tests { async fn proxy_missing_param_returns_400() { let settings = create_test_settings(); let req = Request::new(Method::GET, "https://example.com/first-party/proxy"); - let err: Report = handle_first_party_proxy(&settings, req) - .await - .expect_err("expected error"); + let err: Report = + handle_first_party_proxy(&settings, &noop_services(), req) + .await + .expect_err("expected error"); assert_eq!(err.current_context().status_code(), StatusCode::BAD_GATEWAY); } @@ -1217,9 +1279,10 @@ mod tests { Method::GET, "https://example.com/first-party/proxy?tsurl=https%3A%2F%2Fcdn.example%2Fa.png", ); - let err: Report = handle_first_party_proxy(&settings, req) - .await - .expect_err("expected error"); + let err: Report = + handle_first_party_proxy(&settings, &noop_services(), req) + .await + .expect_err("expected error"); assert_eq!(err.current_context().status_code(), StatusCode::BAD_GATEWAY); } @@ -1231,7 +1294,7 @@ mod tests { }); let mut req = Request::new(Method::POST, "https://edge.example/first-party/sign"); req.set_body(body.to_string()); - let mut resp = handle_first_party_proxy_sign(&settings, req) + let mut resp = handle_first_party_proxy_sign(&settings, &noop_services(), req) .await .expect("sign ok"); assert_eq!(resp.get_status(), StatusCode::OK); @@ -1253,9 +1316,10 @@ mod tests { }); let mut req = Request::new(Method::POST, "https://edge.example/first-party/sign"); req.set_body(body.to_string()); - let err: Report = handle_first_party_proxy_sign(&settings, req) - .await - .expect_err("expected error"); + let err: Report = + handle_first_party_proxy_sign(&settings, &noop_services(), req) + .await + .expect_err("expected error"); assert_eq!(err.current_context().status_code(), StatusCode::BAD_GATEWAY); } @@ -1267,7 +1331,7 @@ mod tests { }); let mut req = Request::new(Method::POST, "https://edge.example/first-party/sign"); req.set_body(body.to_string()); - let mut resp = handle_first_party_proxy_sign(&settings, req) + let mut resp = handle_first_party_proxy_sign(&settings, &noop_services(), req) .await .expect("should sign URL with non-standard port"); assert_eq!( @@ -1354,9 +1418,10 @@ mod tests { async fn click_missing_params_returns_400() { let settings = create_test_settings(); let req = Request::new(Method::GET, "https://edge.example/first-party/click"); - let err: Report = handle_first_party_click(&settings, req) - .await - .expect_err("expected error"); + let err: Report = + handle_first_party_click(&settings, &noop_services(), req) + .await + .expect_err("expected error"); assert_eq!(err.current_context().status_code(), StatusCode::BAD_GATEWAY); } @@ -1376,7 +1441,7 @@ mod tests { sig ), ); - let resp = handle_first_party_click(&settings, req) + let resp = handle_first_party_click(&settings, &noop_services(), req) .await .expect("should redirect"); assert_eq!(resp.get_status(), StatusCode::FOUND); @@ -1406,7 +1471,7 @@ mod tests { let valid_synthetic_id = crate::test_support::tests::VALID_SYNTHETIC_ID; req.set_header(crate::constants::HEADER_X_SYNTHETIC_ID, valid_synthetic_id); - let resp = handle_first_party_click(&settings, req) + let resp = handle_first_party_click(&settings, &noop_services(), req) .await .expect("should redirect"); @@ -1442,7 +1507,7 @@ mod tests { "https://edge.example/first-party/proxy-rebuild", ); req.set_body(serde_json::to_string(&body).expect("test JSON should serialize")); - let mut resp = handle_first_party_proxy_rebuild(&settings, req) + let mut resp = handle_first_party_proxy_rebuild(&settings, &noop_services(), req) .await .expect("rebuild ok"); assert_eq!(resp.get_status(), StatusCode::OK); @@ -1520,9 +1585,10 @@ mod tests { // Build a first-party proxy URL with a token for the unsupported scheme let first_party = creative::build_proxy_url(&settings, clear); let req = Request::new(Method::GET, format!("https://edge.example{}", first_party)); - let err: Report = handle_first_party_proxy(&settings, req) - .await - .expect_err("expected error"); + let err: Report = + handle_first_party_proxy(&settings, &noop_services(), req) + .await + .expect_err("expected error"); assert_eq!(err.current_context().status_code(), StatusCode::BAD_GATEWAY); } @@ -1540,77 +1606,20 @@ mod tests { sig ); let req = Request::new(Method::GET, &url); - let err: Report = handle_first_party_proxy(&settings, req) - .await - .expect_err("expected error"); + let err: Report = + handle_first_party_proxy(&settings, &noop_services(), req) + .await + .expect_err("expected error"); assert_eq!(err.current_context().status_code(), StatusCode::BAD_GATEWAY); } - #[test] - fn header_copy_copies_curated_set() { - let mut src = Request::new(Method::GET, "https://edge.example/first-party/proxy"); - src.set_header(HEADER_USER_AGENT, "UA/1.0"); - src.set_header(HEADER_ACCEPT, "image/*"); - src.set_header(HEADER_ACCEPT_LANGUAGE, "en-US"); - src.set_header(HEADER_ACCEPT_ENCODING, "gzip"); - src.set_header(HEADER_REFERER, "https://pub.example/page"); - src.set_header(HEADER_X_FORWARDED_FOR, "203.0.113.1"); - - let mut dst = Request::new(Method::GET, "https://cdn.example/a.png"); - copy_proxy_forward_headers(&src, &mut dst); - - assert_eq!( - dst.get_header(HEADER_USER_AGENT) - .expect("User-Agent header should be copied") - .to_str() - .expect("User-Agent should be valid UTF-8"), - "UA/1.0" - ); - assert_eq!( - dst.get_header(HEADER_ACCEPT) - .expect("Accept header should be copied") - .to_str() - .expect("Accept should be valid UTF-8"), - "image/*" - ); - assert_eq!( - dst.get_header(HEADER_ACCEPT_LANGUAGE) - .expect("Accept-Language header should be copied") - .to_str() - .expect("Accept-Language should be valid UTF-8"), - "en-US" - ); - // Accept-Encoding is overridden to only include supported encodings - assert_eq!( - dst.get_header(HEADER_ACCEPT_ENCODING) - .expect("Accept-Encoding header should be set") - .to_str() - .expect("Accept-Encoding should be valid UTF-8"), - SUPPORTED_ENCODINGS - ); - assert_eq!( - dst.get_header(HEADER_REFERER) - .expect("Referer header should be copied") - .to_str() - .expect("Referer should be valid UTF-8"), - "https://pub.example/page" - ); - assert_eq!( - dst.get_header(HEADER_X_FORWARDED_FOR) - .expect("X-Forwarded-For header should be copied") - .to_str() - .expect("X-Forwarded-For should be valid UTF-8"), - "203.0.113.1" - ); - } - #[tokio::test] async fn click_sets_cache_control_no_store_private() { let settings = create_test_settings(); let clear = "https://cdn.example/landing.html?x=1"; let first_party = creative::build_click_url(&settings, clear); let req = Request::new(Method::GET, format!("https://edge.example{}", first_party)); - let resp = handle_first_party_click(&settings, req) + let resp = handle_first_party_click(&settings, &noop_services(), req) .await .expect("should redirect"); assert_eq!(resp.get_status(), StatusCode::FOUND); @@ -1911,6 +1920,47 @@ mod tests { ); } + // --- Platform HTTP client integration --- + + #[tokio::test] + async fn proxy_request_calls_platform_http_client_send() { + use crate::platform::test_support::{build_services_with_http_client, StubHttpClient}; + use std::sync::Arc; + + let stub = Arc::new(StubHttpClient::new()); + stub.push_response(200, b"ok".to_vec()); + let services = build_services_with_http_client( + Arc::clone(&stub) as Arc + ); + let settings = create_test_settings(); + let req = Request::new(Method::GET, "https://example.com/"); + + let result = proxy_request( + &settings, + req, + ProxyRequestConfig { + target_url: "https://example.com/resource", + follow_redirects: false, + forward_synthetic_id: false, + body: None, + headers: Vec::new(), + copy_request_headers: false, + stream_passthrough: false, + allowed_domains: &[], + }, + &services, + ) + .await; + + assert!(result.is_ok(), "should proxy successfully"); + let calls = stub.recorded_backend_names(); + assert_eq!(calls.len(), 1, "should call send exactly once"); + assert_eq!( + calls[0], "stub-backend", + "should use backend name from StubBackend" + ); + } + // --- is_host_allowed --- #[test] @@ -2148,7 +2198,8 @@ mod tests { token, ); let req = Request::new(Method::GET, url); - let err = handle_first_party_proxy(&settings, req) + let services = crate::platform::test_support::noop_services(); + let err = handle_first_party_proxy(&settings, &services, req) .await .expect_err("should block initial target not in allowlist"); assert_eq!(