From 8c2d76d5827945baa716ed768fa96fa82d2e54ca Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 14 Jan 2026 10:45:01 -0500 Subject: [PATCH] dedup earlier --- crates/ingress-rpc/src/service.rs | 62 +++++++++++++++---------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 3605dd8..4f965e9 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -24,7 +24,6 @@ use tips_core::{ use tokio::sync::{broadcast, mpsc}; use tokio::time::{Duration, Instant, timeout}; use tracing::{debug, info, warn}; -use uuid::Uuid; use crate::metrics::{Metrics, record_histogram}; use crate::queue::{BundleQueuePublisher, MessageQueue, UserOpQueuePublisher}; @@ -86,7 +85,7 @@ pub struct IngressService { builder_backrun_tx: broadcast::Sender, max_backrun_txs: usize, max_backrun_gas_limit: u64, - bundle_cache: Cache, + bundle_cache: Cache, } impl IngressService { @@ -308,43 +307,40 @@ impl IngressApiServer for Ingre let bundle_hash = &parsed_bundle.bundle_hash(); - self.metrics.bundles_parsed.increment(1); - - let meter_bundle_response = match self.meter_bundle(&bundle, bundle_hash).await { - Ok(response) => { - info!(message = "Metering succeeded for raw transaction", bundle_hash = %bundle_hash, response = ?response); - Some(response) - } - Err(e) => { - warn!( - bundle_hash = %bundle_hash, - error = %e, - "Metering failed for raw transaction" - ); - None - } - }; - - if let Some(meter_info) = meter_bundle_response.as_ref() { - self.metrics.successful_simulations.increment(1); - _ = self.builder_tx.send(meter_info.clone()); - } else { - self.metrics.failed_simulations.increment(1); - } - - let accepted_bundle = - AcceptedBundle::new(parsed_bundle, meter_bundle_response.unwrap_or_default()); - - let bundle_id = *accepted_bundle.uuid(); - if self.bundle_cache.get(&bundle_id).await.is_some() { + if self.bundle_cache.get(bundle_hash).await.is_some() { debug!( message = "Duplicate bundle detected, skipping Kafka publish", - bundle_id = %bundle_id, bundle_hash = %bundle_hash, transaction_hash = %transaction.tx_hash(), ); } else { - self.bundle_cache.insert(bundle_id, ()).await; + self.bundle_cache.insert(*bundle_hash, ()).await; + self.metrics.bundles_parsed.increment(1); + + let meter_bundle_response = match self.meter_bundle(&bundle, bundle_hash).await { + Ok(response) => { + info!(message = "Metering succeeded for raw transaction", bundle_hash = %bundle_hash, response = ?response); + Some(response) + } + Err(e) => { + warn!( + bundle_hash = %bundle_hash, + error = %e, + "Metering failed for raw transaction" + ); + None + } + }; + + if let Some(meter_info) = meter_bundle_response.as_ref() { + self.metrics.successful_simulations.increment(1); + _ = self.builder_tx.send(meter_info.clone()); + } else { + self.metrics.failed_simulations.increment(1); + } + + let accepted_bundle = + AcceptedBundle::new(parsed_bundle, meter_bundle_response.unwrap_or_default()); if send_to_kafka { if let Err(e) = self