-
Notifications
You must be signed in to change notification settings - Fork 2
feat(bundle): filter bundles with stale host tx nonces before SimCache #235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
init4samwise
wants to merge
2
commits into
main
Choose a base branch
from
samwise/eng-1937-bundle-nonce-filter
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+92
−5
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,13 +1,20 @@ | ||
| //! Bundler service responsible for fetching bundles and sending them to the simulator. | ||
| use crate::config::BuilderConfig; | ||
| use alloy::{ | ||
| consensus::{Transaction, transaction::SignerRecoverable}, | ||
| eips::Decodable2718, | ||
| primitives::Bytes, | ||
| providers::Provider, | ||
| rlp::Buf, | ||
| }; | ||
| use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; | ||
| use signet_tx_cache::{TxCacheError, types::CachedBundle}; | ||
| use tokio::{ | ||
| sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, | ||
| task::JoinHandle, | ||
| time::{self, Duration}, | ||
| }; | ||
| use tracing::{Instrument, error, trace, trace_span}; | ||
| use tracing::{Instrument, debug, debug_span, error, trace, trace_span}; | ||
|
|
||
| /// Poll interval for the bundle poller in milliseconds. | ||
| const POLL_INTERVAL_MS: u64 = 1000; | ||
|
|
@@ -71,6 +78,84 @@ impl BundlePoller { | |
| } | ||
| } | ||
|
|
||
| /// Spawns a tokio task to check the nonces of all host transactions in a bundle | ||
| /// before sending it to the cache task via the outbound channel. | ||
| /// | ||
| /// Bundles with stale host transaction nonces are dropped to prevent them from | ||
| /// entering the SimCache, failing simulation, and being re-ingested on the next poll. | ||
| fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender<CachedBundle>) { | ||
| tokio::spawn(async move { | ||
| let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); | ||
|
|
||
| // If no host transactions, forward directly | ||
| if bundle.bundle.host_txs.is_empty() { | ||
| if outbound.send(bundle).is_err() { | ||
| span_debug!(span, "Outbound channel closed, stopping nonce check task"); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| let Ok(host_provider) = | ||
| crate::config().connect_host_provider().instrument(span.clone()).await | ||
| else { | ||
| span_debug!(span, "Failed to connect to host provider, stopping nonce check task"); | ||
| return; | ||
| }; | ||
|
|
||
| // Check each host transaction's nonce | ||
| for (idx, host_tx_bytes) in bundle.bundle.host_txs.iter().enumerate() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the loop should be a producer of futures that are then run concurrently via FuturesUnordered or similar
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with cancellation after the first failed future |
||
| let host_tx = match decode_tx(host_tx_bytes) { | ||
| Some(tx) => tx, | ||
| None => { | ||
| span_debug!( | ||
| span, | ||
| idx, | ||
| "Failed to decode host transaction, dropping bundle" | ||
| ); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| let sender = match host_tx.recover_signer() { | ||
| Ok(s) => s, | ||
| Err(_) => { | ||
| span_debug!( | ||
| span, | ||
| idx, | ||
| "Failed to recover sender from host tx, dropping bundle" | ||
| ); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| let tx_count = match host_provider.get_transaction_count(sender).await { | ||
| Ok(count) => count, | ||
| Err(_) => { | ||
| span_debug!(span, idx, %sender, "Failed to fetch nonce for sender, dropping bundle"); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| if host_tx.nonce() < tx_count { | ||
| debug!( | ||
| parent: &span, | ||
| %sender, | ||
| tx_nonce = %host_tx.nonce(), | ||
| host_nonce = %tx_count, | ||
| idx, | ||
| "Dropping bundle with stale host tx nonce" | ||
| ); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| // All host txs have valid nonces, forward the bundle | ||
| if outbound.send(bundle).is_err() { | ||
| span_debug!(span, "Outbound channel closed, stopping nonce check task"); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| async fn task_future(self, outbound: UnboundedSender<CachedBundle>) { | ||
| loop { | ||
| let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); | ||
|
|
@@ -89,10 +174,7 @@ impl BundlePoller { | |
|
|
||
| if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await { | ||
| for bundle in bundles.into_iter() { | ||
| if let Err(err) = outbound.send(bundle) { | ||
| span_debug!(span, ?err, "Failed to send bundle - channel is dropped"); | ||
| break; | ||
| } | ||
| Self::spawn_check_bundle_nonces(bundle, outbound.clone()); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -109,3 +191,8 @@ impl BundlePoller { | |
| (inbound, jh) | ||
| } | ||
| } | ||
|
|
||
| /// Decodes a transaction from RLP-encoded bytes. | ||
| fn decode_tx(bytes: &Bytes) -> Option<alloy::consensus::TxEnvelope> { | ||
| alloy::consensus::TxEnvelope::decode_2718(&mut bytes.chunk()).ok() | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should re-use existing validity checks from
crates/sim/src/cache/item.rs