diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index e1d61726..631861ca 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,5 +1,12 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; +use alloy::{ + consensus::{Transaction, transaction::SignerRecoverable}, + eips::Decodable2718, + primitives::Bytes, + providers::Provider, + rlp::Buf, +}; use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; use signet_tx_cache::{TxCacheError, types::CachedBundle}; use tokio::{ @@ -7,7 +14,7 @@ use tokio::{ task::JoinHandle, time::{self, Duration}, }; -use tracing::{Instrument, error, trace, trace_span}; +use tracing::{Instrument, debug, debug_span, error, trace, trace_span}; /// Poll interval for the bundle poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -71,6 +78,84 @@ impl BundlePoller { } } + /// Spawns a tokio task to check the nonces of all host transactions in a bundle + /// before sending it to the cache task via the outbound channel. + /// + /// Bundles with stale host transaction nonces are dropped to prevent them from + /// entering the SimCache, failing simulation, and being re-ingested on the next poll. + fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender) { + tokio::spawn(async move { + let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); + + // If no host transactions, forward directly + if bundle.bundle.host_txs.is_empty() { + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } + return; + } + + let Ok(host_provider) = + crate::config().connect_host_provider().instrument(span.clone()).await + else { + span_debug!(span, "Failed to connect to host provider, stopping nonce check task"); + return; + }; + + // Check each host transaction's nonce + for (idx, host_tx_bytes) in bundle.bundle.host_txs.iter().enumerate() { + let host_tx = match decode_tx(host_tx_bytes) { + Some(tx) => tx, + None => { + span_debug!( + span, + idx, + "Failed to decode host transaction, dropping bundle" + ); + return; + } + }; + + let sender = match host_tx.recover_signer() { + Ok(s) => s, + Err(_) => { + span_debug!( + span, + idx, + "Failed to recover sender from host tx, dropping bundle" + ); + return; + } + }; + + let tx_count = match host_provider.get_transaction_count(sender).await { + Ok(count) => count, + Err(_) => { + span_debug!(span, idx, %sender, "Failed to fetch nonce for sender, dropping bundle"); + return; + } + }; + + if host_tx.nonce() < tx_count { + debug!( + parent: &span, + %sender, + tx_nonce = %host_tx.nonce(), + host_nonce = %tx_count, + idx, + "Dropping bundle with stale host tx nonce" + ); + return; + } + } + + // All host txs have valid nonces, forward the bundle + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } + }); + } + async fn task_future(self, outbound: UnboundedSender) { loop { let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); @@ -89,10 +174,7 @@ impl BundlePoller { if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await { for bundle in bundles.into_iter() { - if let Err(err) = outbound.send(bundle) { - span_debug!(span, ?err, "Failed to send bundle - channel is dropped"); - break; - } + Self::spawn_check_bundle_nonces(bundle, outbound.clone()); } } @@ -109,3 +191,8 @@ impl BundlePoller { (inbound, jh) } } + +/// Decodes a transaction from RLP-encoded bytes. +fn decode_tx(bytes: &Bytes) -> Option { + alloy::consensus::TxEnvelope::decode_2718(&mut bytes.chunk()).ok() +}