// file: kb_lib/src/local_pipeline_replay.rs //! Local pipeline replay from already persisted raw transaction data. //! //! This service does not fetch historical transactions from Solana RPC. It //! reuses rows already present in `k_sol_chain_transactions` and replays the //! deterministic local pipeline over their signatures. const LOCAL_PIPELINE_DEX_DECODER_SCOPE: &str = "dex_decode.local_pipeline"; const LOCAL_PIPELINE_DEX_DECODER_VERSION: &str = "dex_decode.v0.7.46.damm_v1_events1"; fn default_skip_certified_dex_decode() -> bool { return true; } /// Configuration for a local pipeline replay pass. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct LocalPipelineReplayConfig { /// Maximum number of persisted transactions to replay. pub limit: std::option::Option, /// Whether token metadata backfill should run after local replay. pub refresh_missing_token_metadata: bool, /// Maximum number of missing token metadata rows to resolve. pub token_metadata_limit: std::option::Option, /// Whether locally replayed market materialization tables are reset before replay. pub reset_market_materialization_before_replay: bool, /// Whether DEX decoding may be skipped when the replay ledger certifies it is safe. #[serde(default = "default_skip_certified_dex_decode")] pub skip_certified_dex_decode: bool, /// Whether DEX decoding must run even when the replay ledger certifies a safe prior pass. #[serde(default)] pub force_decode_replay: bool, } impl Default for LocalPipelineReplayConfig { fn default() -> Self { return Self { limit: Some(10_000), refresh_missing_token_metadata: false, token_metadata_limit: Some(250), reset_market_materialization_before_replay: true, skip_certified_dex_decode: true, force_decode_replay: false, }; } } /// Summary of a local pipeline replay pass. #[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct LocalPipelineReplayResult { /// Number of transaction signatures selected for replay. pub selected_transaction_count: usize, /// Number of transactions replayed without a fatal per-signature error. pub replayed_transaction_count: usize, /// Number of transactions that produced a decode error. pub decode_error_count: usize, /// Number of transactions that produced a detect error. pub detect_error_count: usize, /// Number of transactions that produced a trade aggregation error. pub trade_aggregation_error_count: usize, /// Number of transactions that produced a non-trade materialization error. pub non_trade_materialization_error_count: usize, /// Number of transactions that produced a candle aggregation error. pub pair_candle_error_count: usize, /// Number of transactions that produced an analytic signal error. pub analytic_signal_error_count: usize, /// Total decoded events returned by replayed decode calls. pub decoded_event_count: usize, /// Number of transactions where DEX decoding was skipped through the replay ledger. pub decode_skipped_count: usize, /// Number of persisted decoded events covered by skipped decode ledger rows. pub decode_skipped_event_count: usize, /// Number of replay ledger rows upserted by this replay pass. pub decode_ledger_upsert_count: usize, /// Number of replay ledger rows marked unsafe for future decode skip. pub decode_ledger_unsafe_count: usize, /// Total detection results returned by replayed detect calls. pub detection_count: usize, /// Total trade aggregation results returned by replayed aggregation calls. pub trade_event_count: usize, /// Total liquidity event materialization results returned by replayed non-trade calls. pub liquidity_event_count: usize, /// Total pool lifecycle event materialization results returned by replayed non-trade calls. pub pool_lifecycle_event_count: usize, /// Total fee event materialization results returned by replayed non-trade calls. pub fee_event_count: usize, /// Total reward event materialization results returned by replayed non-trade calls. pub reward_event_count: usize, /// Total pool administration event materialization results returned by replayed non-trade calls. pub pool_admin_event_count: usize, /// Total candle upsert results returned by replayed candle calls. /// /// This is a replay write/result counter, not the number of distinct rows /// currently persisted in `k_sol_pair_candles`. Use local diagnostics for the /// persisted row count. pub pair_candle_upsert_count: usize, /// Total analytic signal upsert results returned by replayed analytic calls. /// /// This is a replay write/result counter, not the number of distinct rows /// currently persisted in the analytic signal table. pub analytic_signal_upsert_count: usize, /// Total transaction classification rows upserted during replay. pub transaction_classification_count: usize, /// Number of transactions that produced a classification error. pub transaction_classification_error_count: usize, /// Number of token metadata rows updated after replay. pub token_metadata_updated_count: usize, /// Number of pair symbols updated after replay. pub pair_symbol_updated_count: usize, /// Number of derived market materialization rows deleted before replay. pub reset_market_materialization_deleted_count: u64, /// Number of errors outside per-signature replay. pub global_error_count: usize, } /// Local pipeline replay service. #[derive(Debug, Clone)] pub struct LocalPipelineReplayService { database: std::sync::Arc, http_pool: std::option::Option>, http_role: std::string::String, } impl LocalPipelineReplayService { /// Creates a new local-only pipeline replay service. pub fn new(database: std::sync::Arc) -> Self { return Self { database, http_pool: None, http_role: "local_metadata".to_string(), }; } /// Creates a new pipeline replay service able to refresh token metadata over Solana HTTP RPC. pub fn new_with_http_pool( http_pool: std::sync::Arc, database: std::sync::Arc, http_role: std::string::String, ) -> Self { return Self { database, http_pool: Some(http_pool), http_role, }; } /// Replays the local pipeline from persisted raw chain transaction rows. pub async fn replay_local_pipeline( &self, config: &crate::LocalPipelineReplayConfig, ) -> Result { let signatures_result = crate::query_chain_transactions_list_signatures_for_replay( self.database.as_ref(), config.limit, ) .await; let signatures = match signatures_result { Ok(signatures) => signatures, Err(error) => return Err(error), }; let mut reset_market_materialization_deleted_count = 0_u64; if config.reset_market_materialization_before_replay { let reset_result = crate::query_trade_market_materialization_delete_all(self.database.as_ref()).await; reset_market_materialization_deleted_count = match reset_result { Ok(deleted_count) => deleted_count, Err(error) => return Err(error), }; tracing::debug!( deleted_count = reset_market_materialization_deleted_count, "local pipeline replay reset market materialization tables" ); } let dex_decode = crate::DexDecodeService::new(self.database.clone()); let dex_detect = crate::DexDetectService::new(self.database.clone()); let trade_aggregation = crate::TradeAggregationService::new(self.database.clone()); let non_trade_materialization = crate::NonTradeEventMaterializationService::new(self.database.clone()); let pair_candle_aggregation = crate::PairCandleAggregationService::new(self.database.clone()); let pair_analytic_signal = crate::PairAnalyticSignalService::new(self.database.clone()); let transaction_classification = crate::TransactionClassificationService::new(self.database.clone()); let mut result = LocalPipelineReplayResult { selected_transaction_count: signatures.len(), reset_market_materialization_deleted_count, ..Default::default() }; for signature in signatures { tracing::debug!( signature = %signature, "replaying local pipeline for persisted transaction" ); let transaction_result = crate::query_chain_transactions_get_by_signature( self.database.as_ref(), signature.as_str(), ) .await; let transaction = match transaction_result { Ok(Some(transaction)) => transaction, Ok(None) => { result.global_error_count += 1; tracing::warn!( signature = %signature, "local pipeline replay transaction row disappeared before replay" ); continue; }, Err(error) => { result.global_error_count += 1; tracing::warn!( signature = %signature, error = %error, "local pipeline replay transaction lookup failed" ); continue; }, }; let transaction_id = match transaction.id { Some(transaction_id) => transaction_id, None => { result.global_error_count += 1; tracing::warn!( signature = %signature, "local pipeline replay transaction row has no persisted id" ); continue; }, }; let decode_skip_ledger_result = self .get_certified_dex_decode_skip_ledger(config, transaction_id, signature.as_str()) .await; let decode_skip_ledger = match decode_skip_ledger_result { Ok(decode_skip_ledger) => decode_skip_ledger, Err(error) => return Err(error), }; match decode_skip_ledger { Some(ledger) => { result.decode_skipped_count += 1; let ledger_event_count = usize::try_from(ledger.event_count); match ledger_event_count { Ok(event_count) => { result.decode_skipped_event_count += event_count; }, Err(error) => { result.global_error_count += 1; tracing::warn!( signature = %signature, event_count = ledger.event_count, error = %error, "local pipeline replay could not convert skipped event count" ); }, } tracing::debug!( signature = %signature, event_count = ledger.event_count, decoder_version = %ledger.decoder_version, "local pipeline replay skipped certified DEX decode step" ); }, None => { let decode_result = dex_decode.decode_transaction_by_signature(signature.as_str()).await; match decode_result { Ok(decoded_events) => { result.decoded_event_count += decoded_events.len(); let ledger_result = self .record_dex_decode_replay_ledger( transaction_id, signature.as_str(), &decoded_events, ) .await; match ledger_result { Ok(ledger) => { result.decode_ledger_upsert_count += 1; if ledger.force_replay_required { result.decode_ledger_unsafe_count += 1; } }, Err(error) => { result.global_error_count += 1; tracing::warn!( signature = %signature, error = %error, "local pipeline replay could not record successful decode ledger row" ); }, } }, Err(error) => { result.decode_error_count += 1; let ledger_result = self .record_failed_dex_decode_replay_ledger( transaction_id, signature.as_str(), error.to_string(), ) .await; match ledger_result { Ok(_) => { result.decode_ledger_upsert_count += 1; result.decode_ledger_unsafe_count += 1; }, Err(ledger_error) => { result.global_error_count += 1; tracing::warn!( signature = %signature, error = %ledger_error, "local pipeline replay could not record failed decode ledger row" ); }, } tracing::warn!( signature = %signature, error = %error, "local pipeline replay decode step failed" ); continue; }, } }, } let detect_result = dex_detect.detect_transaction_by_signature(signature.as_str()).await; match detect_result { Ok(detections) => { result.detection_count += detections.len(); }, Err(error) => { result.detect_error_count += 1; tracing::warn!( signature = %signature, error = %error, "local pipeline replay detect step failed; continuing with aggregation" ); }, } let non_trade_result = non_trade_materialization .record_transaction_by_signature(signature.as_str()) .await; match non_trade_result { Ok(non_trade_result) => { result.liquidity_event_count += non_trade_result.liquidity_event_count; result.pool_lifecycle_event_count += non_trade_result.pool_lifecycle_event_count; result.fee_event_count += non_trade_result.fee_event_count; result.reward_event_count += non_trade_result.reward_event_count; result.pool_admin_event_count += non_trade_result.pool_admin_event_count; }, Err(error) => { result.non_trade_materialization_error_count += 1; tracing::warn!( signature = %signature, error = %error, "local pipeline replay non-trade materialization step failed" ); continue; }, } let trade_result = trade_aggregation.record_transaction_by_signature(signature.as_str()).await; match trade_result { Ok(trade_results) => { result.trade_event_count += trade_results.len(); }, Err(error) => { result.trade_aggregation_error_count += 1; tracing::warn!( signature = %signature, error = %error, "local pipeline replay trade aggregation step failed" ); continue; }, } let candle_result = pair_candle_aggregation .record_transaction_by_signature(signature.as_str()) .await; match candle_result { Ok(candle_results) => { result.pair_candle_upsert_count += candle_results.len(); }, Err(error) => { result.pair_candle_error_count += 1; tracing::warn!( signature = %signature, error = %error, "local pipeline replay candle aggregation step failed" ); }, } let analytic_result = pair_analytic_signal.record_transaction_by_signature(signature.as_str()).await; match analytic_result { Ok(analytic_results) => { result.analytic_signal_upsert_count += analytic_results.len(); }, Err(error) => { result.analytic_signal_error_count += 1; tracing::warn!( signature = %signature, error = %error, "local pipeline replay analytic signal step failed" ); }, } let classification_result = transaction_classification .classify_transaction_by_signature(signature.as_str()) .await; match classification_result { Ok(_) => { result.transaction_classification_count += 1; }, Err(error) => { result.transaction_classification_error_count += 1; tracing::warn!( signature = %signature, error = %error, "local pipeline replay transaction classification step failed" ); }, } result.replayed_transaction_count += 1; } if config.refresh_missing_token_metadata { let metadata_service = match &self.http_pool { Some(http_pool) => crate::TokenMetadataBackfillService::new( http_pool.clone(), self.database.clone(), self.http_role.clone(), ), None => crate::TokenMetadataBackfillService::new_local(self.database.clone()), }; let metadata_result = metadata_service .backfill_missing_token_metadata(config.token_metadata_limit) .await; match metadata_result { Ok(metadata_result) => { result.token_metadata_updated_count += metadata_result.updated_token_count; }, Err(error) => { result.global_error_count += 1; tracing::warn!( error = %error, "token metadata refresh failed after local pipeline replay" ); }, } } return Ok(result); } async fn get_certified_dex_decode_skip_ledger( &self, config: &crate::LocalPipelineReplayConfig, transaction_id: i64, signature: &str, ) -> Result, crate::Error> { if config.force_decode_replay { return Ok(None); } if !config.skip_certified_dex_decode { return Ok(None); } let ledger_result = crate::query_dex_decode_replay_ledger_get_by_transaction( self.database.as_ref(), transaction_id, LOCAL_PIPELINE_DEX_DECODER_SCOPE, LOCAL_PIPELINE_DEX_DECODER_VERSION, ) .await; let ledger_option = match ledger_result { Ok(ledger_option) => ledger_option, Err(error) => return Err(error), }; match ledger_option { Some(ledger) => { if ledger.can_skip_decode() { let persisted_count_result = self .count_persisted_decoded_events_for_skip(transaction_id, signature) .await; let persisted_count = match persisted_count_result { Ok(persisted_count) => persisted_count, Err(error) => return Err(error), }; if persisted_count >= ledger.event_count { return Ok(Some(ledger)); } tracing::debug!( signature = %signature, ledger_event_count = ledger.event_count, persisted_event_count = persisted_count, "local pipeline replay ledger is certified but persisted decoded events are missing" ); return Ok(None); } tracing::debug!( signature = %signature, decode_status = %ledger.decode_status, certainty = %ledger.certainty, force_replay_required = ledger.force_replay_required, "local pipeline replay ledger requires DEX decode" ); return Ok(None); }, None => return Ok(None), } } async fn count_persisted_decoded_events_for_skip( &self, transaction_id: i64, signature: &str, ) -> Result { let events_result = crate::query_dex_decoded_events_list_by_transaction_id( self.database.as_ref(), transaction_id, ) .await; let events = match events_result { Ok(events) => events, Err(error) => return Err(error), }; let count_result = i64::try_from(events.len()); match count_result { Ok(count) => return Ok(count), Err(error) => { return Err(crate::Error::Db(format!( "cannot convert persisted decoded event count for signature '{}' to i64: {}", signature, error ))); }, } } async fn record_dex_decode_replay_ledger( &self, transaction_id: i64, signature: &str, decoded_events: &[crate::DexDecodedEventDto], ) -> Result { let ledger_result = build_success_dex_decode_replay_ledger(transaction_id, signature, decoded_events); let ledger = match ledger_result { Ok(ledger) => ledger, Err(error) => return Err(error), }; let upsert_result = crate::query_dex_decode_replay_ledger_upsert(self.database.as_ref(), &ledger).await; match upsert_result { Ok(_) => return Ok(ledger), Err(error) => return Err(error), } } async fn record_failed_dex_decode_replay_ledger( &self, transaction_id: i64, signature: &str, error_message: std::string::String, ) -> Result { let ledger = crate::DexDecodeReplayLedgerDto::new( transaction_id, signature.to_string(), LOCAL_PIPELINE_DEX_DECODER_SCOPE.to_string(), LOCAL_PIPELINE_DEX_DECODER_VERSION.to_string(), crate::DexDecodeReplayLedgerDto::STATUS_FAILED.to_string(), crate::DexDecodeReplayLedgerDto::CERTAINTY_UNSAFE.to_string(), 0, 0, true, Some(format!("decode failed: {error_message}")), ); let upsert_result = crate::query_dex_decode_replay_ledger_upsert(self.database.as_ref(), &ledger).await; match upsert_result { Ok(_) => return Ok(ledger), Err(error) => return Err(error), } } } fn build_success_dex_decode_replay_ledger( transaction_id: i64, signature: &str, decoded_events: &[crate::DexDecodedEventDto], ) -> Result { let event_count_result = i64::try_from(decoded_events.len()); let event_count = match event_count_result { Ok(event_count) => event_count, Err(error) => { return Err(crate::Error::Db(format!( "cannot convert decoded event count '{}' to i64: {}", decoded_events.len(), error ))); }, }; let effective_event_count_usize = count_effective_decoded_events(decoded_events); let effective_event_count_result = i64::try_from(effective_event_count_usize); let effective_event_count = match effective_event_count_result { Ok(effective_event_count) => effective_event_count, Err(error) => { return Err(crate::Error::Db(format!( "cannot convert effective decoded event count '{}' to i64: {}", effective_event_count_usize, error ))); }, }; let instruction_audit_count = event_count - effective_event_count; let distinct_token_mint_count_usize = count_distinct_decoded_event_token_mints(decoded_events); let distinct_token_mint_count_result = i64::try_from(distinct_token_mint_count_usize); let distinct_token_mint_count = match distinct_token_mint_count_result { Ok(distinct_token_mint_count) => distinct_token_mint_count, Err(error) => { return Err(crate::Error::Db(format!( "cannot convert distinct token mint count '{}' to i64: {}", distinct_token_mint_count_usize, error ))); }, }; let force_replay_required = effective_event_count > 1 || distinct_token_mint_count > 2; let decode_status = if event_count == 0 { crate::DexDecodeReplayLedgerDto::STATUS_NO_EVENTS.to_string() } else { crate::DexDecodeReplayLedgerDto::STATUS_DECODED.to_string() }; let certainty = if force_replay_required { crate::DexDecodeReplayLedgerDto::CERTAINTY_UNSAFE.to_string() } else { crate::DexDecodeReplayLedgerDto::CERTAINTY_SURE.to_string() }; let status_reason = build_dex_decode_replay_ledger_status_reason( event_count, effective_event_count, instruction_audit_count, distinct_token_mint_count, force_replay_required, ); return Ok(crate::DexDecodeReplayLedgerDto::new( transaction_id, signature.to_string(), LOCAL_PIPELINE_DEX_DECODER_SCOPE.to_string(), LOCAL_PIPELINE_DEX_DECODER_VERSION.to_string(), decode_status, certainty, event_count, distinct_token_mint_count, force_replay_required, Some(status_reason), )); } fn count_effective_decoded_events(decoded_events: &[crate::DexDecodedEventDto]) -> usize { let mut count = 0_usize; for event in decoded_events { if is_replay_audit_only_event(event) { continue; } count += 1; } return count; } fn is_replay_audit_only_event(event: &crate::DexDecodedEventDto) -> bool { if event.event_kind.ends_with(".instruction_audit") { return true; } if event.event_kind.ends_with("_audit") { return true; } if event.event_kind == crate::UPSTREAM_REGISTRY_INSTRUCTION_MATCH_EVENT_KIND { return true; } return false; } fn count_distinct_decoded_event_token_mints(decoded_events: &[crate::DexDecodedEventDto]) -> usize { let mut mints = std::collections::BTreeSet::::new(); for event in decoded_events { insert_optional_mint(&mut mints, &event.lp_mint); insert_optional_mint(&mut mints, &event.token_a_mint); insert_optional_mint(&mut mints, &event.token_b_mint); } return mints.len(); } fn insert_optional_mint( mints: &mut std::collections::BTreeSet, mint_option: &std::option::Option, ) { if let Some(mint) = mint_option { let trimmed = mint.trim(); if !trimmed.is_empty() { mints.insert(trimmed.to_string()); } } } fn build_dex_decode_replay_ledger_status_reason( event_count: i64, effective_event_count: i64, instruction_audit_count: i64, distinct_token_mint_count: i64, force_replay_required: bool, ) -> std::string::String { if event_count == 0 { return "decode completed with no persisted DEX event".to_string(); } if force_replay_required { return format!( "decode completed but remains unsafe for skip: event_count={event_count}, effective_event_count={effective_event_count}, instruction_audit_count={instruction_audit_count}, distinct_token_mint_count={distinct_token_mint_count}" ); } return format!( "decode completed and certified for skip: event_count={event_count}, effective_event_count={effective_event_count}, instruction_audit_count={instruction_audit_count}, distinct_token_mint_count={distinct_token_mint_count}" ); } /// Replays the local pipeline from persisted raw chain transaction rows. pub async fn replay_local_pipeline( database: std::sync::Arc, config: &crate::LocalPipelineReplayConfig, ) -> Result { let service = crate::LocalPipelineReplayService::new(database); return service.replay_local_pipeline(config).await; } #[cfg(test)] mod tests { fn make_decoded_event( event_kind: &str, token_a_mint: std::option::Option<&str>, token_b_mint: std::option::Option<&str>, ) -> crate::DexDecodedEventDto { return crate::DexDecodedEventDto::new( 1, Some(10), "meteora_dlmm".to_string(), crate::METEORA_DLMM_PROGRAM_ID.to_string(), event_kind.to_string(), Some("pool".to_string()), None, token_a_mint.map(|value| return value.to_string()), token_b_mint.map(|value| return value.to_string()), None, "{}".to_string(), ); } #[test] fn ledger_certifies_one_effective_event_with_instruction_audits() { let events = vec![ make_decoded_event("meteora_dlmm.swap", Some("mint-a"), Some("mint-b")), make_decoded_event("meteora_dlmm.instruction_audit", None, None), make_decoded_event("meteora_dlmm.instruction_audit", None, None), ]; let ledger = super::build_success_dex_decode_replay_ledger(1, "sig", events.as_slice()) .expect("ledger must build"); assert_eq!(ledger.event_count, 3); assert_eq!(ledger.distinct_token_mint_count, 2); assert!(!ledger.force_replay_required); assert_eq!(ledger.certainty, crate::DexDecodeReplayLedgerDto::CERTAINTY_SURE); assert!(ledger.can_skip_decode()); } #[test] fn ledger_treats_audit_suffix_events_as_audit_only() { let events = vec![ make_decoded_event("openbook_v2.settle_funds_audit", Some("mint-a"), Some("mint-b")), make_decoded_event("openbook_v2.order_place_audit", None, None), ]; let ledger = super::build_success_dex_decode_replay_ledger(1, "sig", events.as_slice()) .expect("ledger must build"); assert_eq!(ledger.event_count, 2); assert_eq!(ledger.status_reason.as_deref(), Some("decode completed and certified for skip: event_count=2, effective_event_count=0, instruction_audit_count=2, distinct_token_mint_count=2")); assert!(!ledger.force_replay_required); assert!(ledger.can_skip_decode()); } #[test] fn ledger_keeps_multiple_effective_events_unsafe() { let events = vec![ make_decoded_event("meteora_dlmm.swap", Some("mint-a"), Some("mint-b")), make_decoded_event("meteora_dlmm.swap", Some("mint-a"), Some("mint-b")), make_decoded_event("meteora_dlmm.instruction_audit", None, None), ]; let ledger = super::build_success_dex_decode_replay_ledger(1, "sig", events.as_slice()) .expect("ledger must build"); assert_eq!(ledger.event_count, 3); assert!(ledger.force_replay_required); assert_eq!(ledger.certainty, crate::DexDecodeReplayLedgerDto::CERTAINTY_UNSAFE); assert!(!ledger.can_skip_decode()); } }