Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 92 additions & 5 deletions src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
//! Bundler service responsible for fetching bundles and sending them to the simulator.
use crate::config::BuilderConfig;
use alloy::{
consensus::{Transaction, transaction::SignerRecoverable},
eips::Decodable2718,
primitives::Bytes,
providers::Provider,
rlp::Buf,
};
use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError};
use signet_tx_cache::{TxCacheError, types::CachedBundle};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
task::JoinHandle,
time::{self, Duration},
};
use tracing::{Instrument, error, trace, trace_span};
use tracing::{Instrument, debug, debug_span, error, trace, trace_span};

/// Poll interval for the bundle poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
Expand Down Expand Up @@ -71,6 +78,84 @@ impl BundlePoller {
}
}

/// Spawns a tokio task to check the nonces of all host transactions in a bundle
/// before sending it to the cache task via the outbound channel.
///
/// Bundles with stale host transaction nonces are dropped to prevent them from
/// entering the SimCache, failing simulation, and being re-ingested on the next poll.
fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender<CachedBundle>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should re-use existing validity checks from crates/sim/src/cache/item.rs

tokio::spawn(async move {
let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id);

// If no host transactions, forward directly
if bundle.bundle.host_txs.is_empty() {
if outbound.send(bundle).is_err() {
span_debug!(span, "Outbound channel closed, stopping nonce check task");
}
return;
}

let Ok(host_provider) =
crate::config().connect_host_provider().instrument(span.clone()).await
else {
span_debug!(span, "Failed to connect to host provider, stopping nonce check task");
return;
};

// Check each host transaction's nonce
for (idx, host_tx_bytes) in bundle.bundle.host_txs.iter().enumerate() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the loop should be a producer of futures that are then run concurrently via FuturesUnordered or similar

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with cancellation after the first failed future

let host_tx = match decode_tx(host_tx_bytes) {
Some(tx) => tx,
None => {
span_debug!(
span,
idx,
"Failed to decode host transaction, dropping bundle"
);
return;
}
};

let sender = match host_tx.recover_signer() {
Ok(s) => s,
Err(_) => {
span_debug!(
span,
idx,
"Failed to recover sender from host tx, dropping bundle"
);
return;
}
};

let tx_count = match host_provider.get_transaction_count(sender).await {
Ok(count) => count,
Err(_) => {
span_debug!(span, idx, %sender, "Failed to fetch nonce for sender, dropping bundle");
return;
}
};

if host_tx.nonce() < tx_count {
debug!(
parent: &span,
%sender,
tx_nonce = %host_tx.nonce(),
host_nonce = %tx_count,
idx,
"Dropping bundle with stale host tx nonce"
);
return;
}
}

// All host txs have valid nonces, forward the bundle
if outbound.send(bundle).is_err() {
span_debug!(span, "Outbound channel closed, stopping nonce check task");
}
});
}

async fn task_future(self, outbound: UnboundedSender<CachedBundle>) {
loop {
let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url);
Expand All @@ -89,10 +174,7 @@ impl BundlePoller {

if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await {
for bundle in bundles.into_iter() {
if let Err(err) = outbound.send(bundle) {
span_debug!(span, ?err, "Failed to send bundle - channel is dropped");
break;
}
Self::spawn_check_bundle_nonces(bundle, outbound.clone());
}
}

Expand All @@ -109,3 +191,8 @@ impl BundlePoller {
(inbound, jh)
}
}

/// Decodes a transaction from RLP-encoded bytes.
fn decode_tx(bytes: &Bytes) -> Option<alloy::consensus::TxEnvelope> {
alloy::consensus::TxEnvelope::decode_2718(&mut bytes.chunk()).ok()
}