From 5f815ad58c7230f2aa1c58178a47f36d28672c59 Mon Sep 17 00:00:00 2001 From: init4samwise Date: Tue, 3 Mar 2026 04:48:07 +0000 Subject: [PATCH 1/2] feat(bundle): filter bundles with stale host tx nonces before SimCache Adds nonce checking for host transactions in BundlePoller, similar to the existing TxPoller pattern. Bundles with stale host tx nonces are dropped before entering SimCache to prevent: - Wasted simulation cycles on bundles that will fail - ERROR log spam from nonce-too-low failures - Re-ingestion churn (~1s poll cycle) Each host transaction's nonce is compared against the sender's current nonce from the host provider. If any host tx has a stale nonce, the entire bundle is dropped with DEBUG-level logging. Closes ENG-1937 --- src/tasks/cache/bundle.rs | 89 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 84 insertions(+), 5 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index e1d61726..d520dbf5 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,5 +1,12 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; +use alloy::{ + consensus::{Transaction, transaction::SignerRecoverable}, + eips::Decodable2718, + primitives::Bytes, + providers::Provider, + rlp::Buf, +}; use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; use signet_tx_cache::{TxCacheError, types::CachedBundle}; use tokio::{ @@ -7,7 +14,7 @@ use tokio::{ task::JoinHandle, time::{self, Duration}, }; -use tracing::{Instrument, error, trace, trace_span}; +use tracing::{Instrument, debug, debug_span, error, trace, trace_span}; /// Poll interval for the bundle poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -71,6 +78,76 @@ impl BundlePoller { } } + /// Spawns a tokio task to check the nonces of all host transactions in a bundle + /// before sending it to the cache task via the outbound channel. + /// + /// Bundles with stale host transaction nonces are dropped to prevent them from + /// entering the SimCache, failing simulation, and being re-ingested on the next poll. + fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender) { + tokio::spawn(async move { + let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); + + // If no host transactions, forward directly + if bundle.bundle.host_txs.is_empty() { + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } + return; + } + + let Ok(host_provider) = + crate::config().connect_host_provider().instrument(span.clone()).await + else { + span_debug!(span, "Failed to connect to host provider, stopping nonce check task"); + return; + }; + + // Check each host transaction's nonce + for (idx, host_tx_bytes) in bundle.bundle.host_txs.iter().enumerate() { + let host_tx = match decode_tx(host_tx_bytes) { + Some(tx) => tx, + None => { + span_debug!(span, idx, "Failed to decode host transaction, dropping bundle"); + return; + } + }; + + let sender = match host_tx.recover_signer() { + Ok(s) => s, + Err(_) => { + span_debug!(span, idx, "Failed to recover sender from host tx, dropping bundle"); + return; + } + }; + + let tx_count = match host_provider.get_transaction_count(sender).await { + Ok(count) => count, + Err(_) => { + span_debug!(span, idx, %sender, "Failed to fetch nonce for sender, dropping bundle"); + return; + } + }; + + if host_tx.nonce() < tx_count { + debug!( + parent: &span, + %sender, + tx_nonce = %host_tx.nonce(), + host_nonce = %tx_count, + idx, + "Dropping bundle with stale host tx nonce" + ); + return; + } + } + + // All host txs have valid nonces, forward the bundle + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } + }); + } + async fn task_future(self, outbound: UnboundedSender) { loop { let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); @@ -89,10 +166,7 @@ impl BundlePoller { if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await { for bundle in bundles.into_iter() { - if let Err(err) = outbound.send(bundle) { - span_debug!(span, ?err, "Failed to send bundle - channel is dropped"); - break; - } + Self::spawn_check_bundle_nonces(bundle, outbound.clone()); } } @@ -109,3 +183,8 @@ impl BundlePoller { (inbound, jh) } } + +/// Decodes a transaction from RLP-encoded bytes. +fn decode_tx(bytes: &Bytes) -> Option { + alloy::consensus::TxEnvelope::decode_2718(&mut bytes.chunk()).ok() +} From ece4913e18dfa34766c164a28c4173c3fe34b68c Mon Sep 17 00:00:00 2001 From: init4samwise Date: Thu, 5 Mar 2026 00:12:52 +0000 Subject: [PATCH 2/2] style: run cargo fmt --- src/tasks/cache/bundle.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index d520dbf5..631861ca 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -107,7 +107,11 @@ impl BundlePoller { let host_tx = match decode_tx(host_tx_bytes) { Some(tx) => tx, None => { - span_debug!(span, idx, "Failed to decode host transaction, dropping bundle"); + span_debug!( + span, + idx, + "Failed to decode host transaction, dropping bundle" + ); return; } }; @@ -115,7 +119,11 @@ impl BundlePoller { let sender = match host_tx.recover_signer() { Ok(s) => s, Err(_) => { - span_debug!(span, idx, "Failed to recover sender from host tx, dropping bundle"); + span_debug!( + span, + idx, + "Failed to recover sender from host tx, dropping bundle" + ); return; } };