// file: kb_lib/src/token_backfill.rs //! Historical token backfill service. /// One token-backfill result summary. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct TokenBackfillResult { /// Input token mint. pub token_mint: std::string::String, /// Number of signatures returned directly for the mint. pub mint_signature_count: usize, /// Number of pool addresses discovered for the token after the first replay pass. pub pool_address_count: usize, /// Number of signatures returned from those pool addresses. pub pool_signature_count: usize, /// Number of unique signatures processed during this run. pub unique_signature_count: usize, /// Number of transactions resolved through HTTP during this run. pub resolved_transaction_count: usize, /// Number of signatures whose `getTransaction` lookup returned `null`. pub missing_transaction_count: usize, /// Number of signatures whose `getTransaction` lookup failed after retries. pub transaction_fetch_error_count: usize, /// Last transaction fetch error observed during this run, if any. pub last_transaction_fetch_error: std::option::Option, /// Total number of decoded DEX events replayed during this run. pub decoded_event_count: usize, /// Total number of DEX detection results produced during this run. pub detection_count: usize, /// Total number of launch-attribution results produced during this run. pub launch_attribution_count: usize, /// Total number of pool-origin results produced during this run. pub pool_origin_count: usize, /// Total number of wallet-participation observations produced during this run. pub wallet_participation_count: usize, /// Total number of trade-aggregation results produced during this run. pub trade_event_count: usize, /// Total number of liquidity event materialization results produced during this run. pub liquidity_event_count: usize, /// Total number of pool lifecycle event materialization results produced during this run. pub pool_lifecycle_event_count: usize, /// Total number of fee event materialization results produced during this run. pub fee_event_count: usize, /// Total number of reward event materialization results produced during this run. pub reward_event_count: usize, /// Total number of pool administration event materialization results produced during this run. pub pool_admin_event_count: usize, /// Total number of pair-candle aggregation results produced during this run. pub pair_candle_count: usize, } /// One pool-backfill result summary. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PoolBackfillResult { /// Input pool address. pub pool_address: std::string::String, /// Number of signatures returned directly for the pool address. pub pool_signature_count: usize, /// Number of unique signatures processed during this run. pub unique_signature_count: usize, /// Number of transactions resolved through HTTP during this run. pub resolved_transaction_count: usize, /// Number of signatures whose `getTransaction` lookup returned `null`. pub missing_transaction_count: usize, /// Number of signatures whose `getTransaction` lookup failed after retries. pub transaction_fetch_error_count: usize, /// Last transaction fetch error observed during this run, if any. pub last_transaction_fetch_error: std::option::Option, /// Total number of decoded DEX events replayed during this run. pub decoded_event_count: usize, /// Total number of DEX detection results produced during this run. pub detection_count: usize, /// Total number of launch-attribution results produced during this run. pub launch_attribution_count: usize, /// Total number of pool-origin results produced during this run. pub pool_origin_count: usize, /// Total number of wallet-participation observations produced during this run. pub wallet_participation_count: usize, /// Total number of trade-aggregation results produced during this run. pub trade_event_count: usize, /// Total number of liquidity event materialization results produced during this run. pub liquidity_event_count: usize, /// Total number of pool lifecycle event materialization results produced during this run. pub pool_lifecycle_event_count: usize, /// Total number of fee event materialization results produced during this run. pub fee_event_count: usize, /// Total number of reward event materialization results produced during this run. pub reward_event_count: usize, /// Total number of pool administration event materialization results produced during this run. pub pool_admin_event_count: usize, /// Total number of pair-candle aggregation results produced during this run. pub pair_candle_count: usize, } /// One signature-backfill result summary. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct SignatureBackfillResult { /// Input transaction signature. pub signature: std::string::String, /// Number of transactions resolved through HTTP during this run. pub resolved_transaction_count: usize, /// Number of signatures whose `getTransaction` lookup returned `null`. pub missing_transaction_count: usize, /// Number of signatures whose `getTransaction` lookup failed after retries. pub transaction_fetch_error_count: usize, /// Last transaction fetch error observed during this run, if any. pub last_transaction_fetch_error: std::option::Option, /// Total number of decoded DEX events replayed during this run. pub decoded_event_count: usize, /// Total number of DEX detection results produced during this run. pub detection_count: usize, /// Total number of launch-attribution results produced during this run. pub launch_attribution_count: usize, /// Total number of pool-origin results produced during this run. pub pool_origin_count: usize, /// Total number of wallet-participation observations produced during this run. pub wallet_participation_count: usize, /// Total number of trade-aggregation results produced during this run. pub trade_event_count: usize, /// Total number of liquidity event materialization results produced during this run. pub liquidity_event_count: usize, /// Total number of pool lifecycle event materialization results produced during this run. pub pool_lifecycle_event_count: usize, /// Total number of fee event materialization results produced during this run. pub fee_event_count: usize, /// Total number of reward event materialization results produced during this run. pub reward_event_count: usize, /// Total number of pool administration event materialization results produced during this run. pub pool_admin_event_count: usize, /// Total number of pair-candle aggregation results produced during this run. pub pair_candle_count: usize, } /// Historical token backfill service. /// /// This service reuses the existing transaction projection and downstream /// DEX pipeline instead of introducing a separate historical code path. #[derive(Debug, Clone)] pub struct TokenBackfillService { http_pool: std::sync::Arc, database: std::sync::Arc, persistence: crate::DetectionPersistenceService, http_role: std::string::String, transaction_model: crate::TransactionModelService, dex_decode_service: crate::DexDecodeService, dex_detect_service: crate::DexDetectService, launch_origin_service: crate::LaunchOriginService, pool_origin_service: crate::PoolOriginService, wallet_observation_service: crate::WalletObservationService, non_trade_materialization_service: crate::NonTradeEventMaterializationService, trade_aggregation_service: crate::TradeAggregationService, pair_candle_aggregation_service: crate::PairCandleAggregationService, transaction_classification_service: crate::TransactionClassificationService, token_metadata_service: crate::TokenMetadataBackfillService, } const TOKEN_BACKFILL_GET_TRANSACTION_MAX_ATTEMPTS: usize = 4; const TOKEN_BACKFILL_GET_TRANSACTION_RETRY_BASE_DELAY_MS: u64 = 500; impl TokenBackfillService { /// Creates a new token-backfill service. pub fn new( http_pool: std::sync::Arc, database: std::sync::Arc, http_role: std::string::String, ) -> Self { let persistence = crate::DetectionPersistenceService::new(database.clone()); let transaction_model = crate::TransactionModelService::new(database.clone()); let dex_decode_service = crate::DexDecodeService::new(database.clone()); let dex_detect_service = crate::DexDetectService::new(database.clone()); let launch_origin_service = crate::LaunchOriginService::new(database.clone()); let pool_origin_service = crate::PoolOriginService::new(database.clone()); let wallet_observation_service = crate::WalletObservationService::new(database.clone()); let non_trade_materialization_service = crate::NonTradeEventMaterializationService::new(database.clone()); let trade_aggregation_service = crate::TradeAggregationService::new(database.clone()); let pair_candle_aggregation_service = crate::PairCandleAggregationService::new(database.clone()); let transaction_classification_service = crate::TransactionClassificationService::new(database.clone()); let token_metadata_service = crate::TokenMetadataBackfillService::new( http_pool.clone(), database.clone(), http_role.clone(), ); return Self { http_pool, database, persistence, http_role, transaction_model, dex_decode_service, dex_detect_service, launch_origin_service, pool_origin_service, wallet_observation_service, non_trade_materialization_service, trade_aggregation_service, pair_candle_aggregation_service, transaction_classification_service, token_metadata_service, }; } /// Replays the historical activity of one token mint through the existing pipeline. pub async fn backfill_token_by_mint( &self, token_mint: &str, mint_signature_limit: usize, pool_signature_limit: usize, ) -> Result { let mut result = crate::TokenBackfillResult { token_mint: token_mint.to_string(), mint_signature_count: 0, pool_address_count: 0, pool_signature_count: 0, unique_signature_count: 0, resolved_transaction_count: 0, missing_transaction_count: 0, transaction_fetch_error_count: 0, last_transaction_fetch_error: None, decoded_event_count: 0, detection_count: 0, launch_attribution_count: 0, pool_origin_count: 0, wallet_participation_count: 0, trade_event_count: 0, liquidity_event_count: 0, pool_lifecycle_event_count: 0, fee_event_count: 0, reward_event_count: 0, pool_admin_event_count: 0, pair_candle_count: 0, }; let mut seen_signatures = std::collections::HashSet::::new(); let mint_signatures_result = self .fetch_signatures_for_address(token_mint.to_string(), mint_signature_limit) .await; let mut mint_signatures = match mint_signatures_result { Ok(mint_signatures) => mint_signatures, Err(error) => return Err(error), }; result.mint_signature_count = mint_signatures.len(); mint_signatures.reverse(); for signature_status in mint_signatures { let signature = signature_status.signature.clone(); if seen_signatures.contains(signature.as_str()) { continue; } seen_signatures.insert(signature.clone()); result.unique_signature_count += 1; let replay_result = self.replay_signature(signature).await; let replay_result = match replay_result { Ok(replay_result) => replay_result, Err(error) => return Err(error), }; merge_token_backfill_signature_result(&mut result, replay_result); } let pool_addresses_result = self.collect_pool_addresses_for_token_mint(token_mint).await; let pool_addresses = match pool_addresses_result { Ok(pool_addresses) => pool_addresses, Err(error) => return Err(error), }; result.pool_address_count = pool_addresses.len(); for pool_address in pool_addresses { let pool_signatures_result = self .fetch_signatures_for_address(pool_address.clone(), pool_signature_limit) .await; let mut pool_signatures = match pool_signatures_result { Ok(pool_signatures) => pool_signatures, Err(error) => return Err(error), }; result.pool_signature_count += pool_signatures.len(); pool_signatures.reverse(); for signature_status in pool_signatures { let signature = signature_status.signature.clone(); if seen_signatures.contains(signature.as_str()) { continue; } seen_signatures.insert(signature.clone()); result.unique_signature_count += 1; let replay_result = self.replay_signature(signature).await; let replay_result = match replay_result { Ok(replay_result) => replay_result, Err(error) => return Err(error), }; merge_token_backfill_signature_result(&mut result, replay_result); } } self.backfill_missing_token_metadata_best_effort(100).await; self.refresh_event_coverage_best_effort().await; let summary_payload = serde_json::json!({ "tokenMint": result.token_mint, "mintSignatureCount": result.mint_signature_count, "poolAddressCount": result.pool_address_count, "poolSignatureCount": result.pool_signature_count, "uniqueSignatureCount": result.unique_signature_count, "resolvedTransactionCount": result.resolved_transaction_count, "missingTransactionCount": result.missing_transaction_count, "transactionFetchErrorCount": result.transaction_fetch_error_count, "lastTransactionFetchError": result.last_transaction_fetch_error, "decodedEventCount": result.decoded_event_count, "detectionCount": result.detection_count, "launchAttributionCount": result.launch_attribution_count, "poolOriginCount": result.pool_origin_count, "walletParticipationCount": result.wallet_participation_count, "tradeEventCount": result.trade_event_count, "liquidityEventCount": result.liquidity_event_count, "poolLifecycleEventCount": result.pool_lifecycle_event_count, "feeEventCount": result.fee_event_count, "rewardEventCount": result.reward_event_count, "poolAdminEventCount": result.pool_admin_event_count, "pairCandleCount": result.pair_candle_count }); let observation_result = self .persistence .record_observation(&crate::DetectionObservationInput::new( "token.backfill.completed".to_string(), crate::ObservationSourceKind::HttpRpc, Some(format!("backfill:{}", self.http_role)), token_mint.to_string(), None, summary_payload.clone(), )) .await; let observation_id = match observation_result { Ok(observation_id) => observation_id, Err(error) => return Err(error), }; let signal_result = self .persistence .record_signal(&crate::DetectionSignalInput::new( "signal.token.backfill.completed".to_string(), crate::AnalysisSignalSeverity::Low, token_mint.to_string(), Some(observation_id), None, summary_payload, )) .await; if let Err(error) = signal_result { return Err(error); } return Ok(result); } async fn fetch_signatures_for_address( &self, address: std::string::String, limit: usize, ) -> Result< std::vec::Vec, crate::Error, > { let config = solana_rpc_client_api::config::RpcSignaturesForAddressConfig { before: None, until: None, limit: Some(limit), commitment: None, min_context_slot: None, }; return self .http_pool .get_signatures_for_address_for_role(self.http_role.as_str(), address, Some(config)) .await; } async fn collect_pool_addresses_for_token_mint( &self, token_mint: &str, ) -> Result, crate::Error> { let token_result = crate::query_tokens_get_by_mint(self.database.as_ref(), token_mint).await; let token_option = match token_result { Ok(token_option) => token_option, Err(error) => return Err(error), }; let token = match token_option { Some(token) => token, None => return Ok(std::vec::Vec::new()), }; let token_id = match token.id { Some(token_id) => token_id, None => { return Err(crate::Error::InvalidState(format!( "token '{}' has no internal id", token.mint ))); }, }; let pools_result = crate::query_pools_list(self.database.as_ref()).await; let pools = match pools_result { Ok(pools) => pools, Err(error) => return Err(error), }; let mut addresses = std::vec::Vec::new(); let mut seen = std::collections::HashSet::::new(); for pool in pools { let pool_id = match pool.id { Some(pool_id) => pool_id, None => continue, }; let pool_tokens_result = crate::query_pool_tokens_list_by_pool_id(self.database.as_ref(), pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => return Err(error), }; let mut contains_token = false; for pool_token in pool_tokens { if pool_token.token_id == token_id { contains_token = true; break; } } if contains_token && !seen.contains(pool.address.as_str()) { seen.insert(pool.address.clone()); addresses.push(pool.address.clone()); } } addresses.sort(); return Ok(addresses); } async fn replay_signature( &self, signature: std::string::String, ) -> Result { let config = Some(serde_json::json!({ "encoding": "jsonParsed", "maxSupportedTransactionVersion": 0 })); let transaction_value_result = self.fetch_transaction_value_with_retry(signature.as_str(), config).await; let transaction_value = match transaction_value_result { Ok(transaction_value) => transaction_value, Err(error) => { tracing::warn!( signature = %signature, error = %error, "skipping signature after getTransaction retries failed during backfill" ); return Ok(TokenBackfillSignatureResult { resolved_transaction_count: 0, missing_transaction_count: 0, transaction_fetch_error_count: 1, last_transaction_fetch_error: Some(error.to_string()), decoded_event_count: 0, detection_count: 0, launch_attribution_count: 0, pool_origin_count: 0, wallet_participation_count: 0, trade_event_count: 0, liquidity_event_count: 0, pool_lifecycle_event_count: 0, fee_event_count: 0, reward_event_count: 0, pool_admin_event_count: 0, pair_candle_count: 0, }); }, }; if transaction_value.is_null() { return Ok(TokenBackfillSignatureResult { resolved_transaction_count: 0, missing_transaction_count: 1, transaction_fetch_error_count: 0, last_transaction_fetch_error: None, decoded_event_count: 0, detection_count: 0, launch_attribution_count: 0, pool_origin_count: 0, wallet_participation_count: 0, trade_event_count: 0, liquidity_event_count: 0, pool_lifecycle_event_count: 0, fee_event_count: 0, reward_event_count: 0, pool_admin_event_count: 0, pair_candle_count: 0, }); } let existing_transaction_result = crate::query_chain_transactions_get_by_signature( self.database.as_ref(), signature.as_str(), ) .await; let existing_transaction_option = match existing_transaction_result { Ok(existing_transaction_option) => existing_transaction_option, Err(error) => return Err(error), }; if existing_transaction_option.is_none() { let persist_result = self .transaction_model .persist_resolved_transaction( signature.as_str(), Some(format!("backfill:{}", self.http_role)), &transaction_value, ) .await; if let Err(error) = persist_result { return Err(error); } } let decoded_result = self .dex_decode_service .decode_transaction_by_signature(signature.as_str()) .await; let decoded = match decoded_result { Ok(decoded) => decoded, Err(error) => return Err(error), }; let detections_result = self .dex_detect_service .detect_transaction_by_signature(signature.as_str()) .await; let detections = match detections_result { Ok(detections) => detections, Err(error) => return Err(error), }; let launch_attributions_result = self .launch_origin_service .attribute_transaction_by_signature(signature.as_str()) .await; let launch_attributions = match launch_attributions_result { Ok(launch_attributions) => launch_attributions, Err(error) => return Err(error), }; let pool_origins_result = self .pool_origin_service .record_transaction_by_signature(signature.as_str()) .await; let pool_origins = match pool_origins_result { Ok(pool_origins) => pool_origins, Err(error) => return Err(error), }; let wallet_observations_result = self .wallet_observation_service .record_transaction_by_signature(signature.as_str()) .await; let wallet_observations = match wallet_observations_result { Ok(wallet_observations) => wallet_observations, Err(error) => return Err(error), }; let non_trade_materialization_result = self .non_trade_materialization_service .record_transaction_by_signature(signature.as_str()) .await; let non_trade_materialization = match non_trade_materialization_result { Ok(non_trade_materialization) => non_trade_materialization, Err(error) => return Err(error), }; let trade_aggregations_result = self .trade_aggregation_service .record_transaction_by_signature(signature.as_str()) .await; let trade_aggregations = match trade_aggregations_result { Ok(trade_aggregations) => trade_aggregations, Err(error) => return Err(error), }; let pair_candle_aggregations_result = self .pair_candle_aggregation_service .record_transaction_by_signature(signature.as_str()) .await; let pair_candle_aggregations = match pair_candle_aggregations_result { Ok(pair_candle_aggregations) => pair_candle_aggregations, Err(error) => return Err(error), }; let transaction_classification_result = self .transaction_classification_service .classify_transaction_by_signature(signature.as_str()) .await; if let Err(error) = transaction_classification_result { return Err(error); } let instruction_observation_index = crate::InstructionObservationIndexService::new(self.database.clone()); let instruction_observation_result = instruction_observation_index.refresh_signature(signature.as_str()).await; match instruction_observation_result { Ok(index_result) => { tracing::debug!( signature = %signature, upserted_observation_count = index_result.upserted_observation_count, "instruction observation index refreshed after signature replay" ); }, Err(error) => { tracing::warn!( signature = %signature, error = %error, "instruction observation index refresh failed after signature replay" ); }, } return Ok(TokenBackfillSignatureResult { resolved_transaction_count: 1, missing_transaction_count: 0, transaction_fetch_error_count: 0, last_transaction_fetch_error: None, decoded_event_count: decoded.len(), detection_count: detections.len(), launch_attribution_count: launch_attributions.len(), pool_origin_count: pool_origins.len(), wallet_participation_count: wallet_observations.len(), trade_event_count: trade_aggregations.len(), liquidity_event_count: non_trade_materialization.liquidity_event_count, pool_lifecycle_event_count: non_trade_materialization.pool_lifecycle_event_count, fee_event_count: non_trade_materialization.fee_event_count, reward_event_count: non_trade_materialization.reward_event_count, pool_admin_event_count: non_trade_materialization.pool_admin_event_count, pair_candle_count: pair_candle_aggregations.len(), }); } /// Replays the historical activity of one pool address through the existing pipeline. pub async fn backfill_pool_by_address( &self, pool_address: &str, pool_signature_limit: usize, ) -> Result { let effective_limit = if pool_signature_limit > 1000 { 1000 } else { pool_signature_limit }; let mut result = crate::PoolBackfillResult { pool_address: pool_address.to_string(), pool_signature_count: 0, unique_signature_count: 0, resolved_transaction_count: 0, missing_transaction_count: 0, transaction_fetch_error_count: 0, last_transaction_fetch_error: None, decoded_event_count: 0, detection_count: 0, launch_attribution_count: 0, pool_origin_count: 0, wallet_participation_count: 0, trade_event_count: 0, liquidity_event_count: 0, pool_lifecycle_event_count: 0, fee_event_count: 0, reward_event_count: 0, pool_admin_event_count: 0, pair_candle_count: 0, }; let mut seen_addresses = std::collections::BTreeSet::::new(); let mut addresses_to_scan = std::vec::Vec::::new(); let trimmed_pool_address = pool_address.trim().to_string(); if trimmed_pool_address.is_empty() { return Err(crate::Error::Config("pool_address must not be empty".to_string())); } seen_addresses.insert(trimmed_pool_address.clone()); addresses_to_scan.push(trimmed_pool_address.clone()); let pool_result = crate::query_pools_get_by_address( self.database.as_ref(), trimmed_pool_address.as_str(), ) .await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => return Err(error), }; if let Some(pool) = pool_option { let pool_id = match pool.id { Some(pool_id) => pool_id, None => { return Err(crate::Error::InvalidState(format!( "pool '{}' has no internal id", pool.address ))); }, }; let pool_tokens_result = crate::query_pool_tokens_list_by_pool_id(self.database.as_ref(), pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => return Err(error), }; for pool_token in pool_tokens { let vault_address_option = pool_token.vault_address.clone(); let vault_address = match vault_address_option { Some(vault_address) => vault_address.trim().to_string(), None => continue, }; if vault_address.is_empty() { continue; } if seen_addresses.contains(vault_address.as_str()) { continue; } seen_addresses.insert(vault_address.clone()); addresses_to_scan.push(vault_address); } } let mut seen_signatures = std::collections::HashSet::::new(); for address in &addresses_to_scan { let signatures_result = self.fetch_signatures_for_address(address.clone(), effective_limit).await; let mut signatures = match signatures_result { Ok(signatures) => signatures, Err(error) => return Err(error), }; if address == &trimmed_pool_address { result.pool_signature_count = signatures.len(); } signatures.reverse(); for signature_status in signatures { let signature = signature_status.signature.clone(); if seen_signatures.contains(signature.as_str()) { continue; } seen_signatures.insert(signature.clone()); result.unique_signature_count += 1; let replay_result = self.replay_signature(signature).await; let replay_result = match replay_result { Ok(replay_result) => replay_result, Err(error) => return Err(error), }; result.resolved_transaction_count += replay_result.resolved_transaction_count; result.missing_transaction_count += replay_result.missing_transaction_count; result.transaction_fetch_error_count += replay_result.transaction_fetch_error_count; if replay_result.last_transaction_fetch_error.is_some() { result.last_transaction_fetch_error = replay_result.last_transaction_fetch_error.clone(); } result.decoded_event_count += replay_result.decoded_event_count; result.detection_count += replay_result.detection_count; result.launch_attribution_count += replay_result.launch_attribution_count; result.pool_origin_count += replay_result.pool_origin_count; result.wallet_participation_count += replay_result.wallet_participation_count; result.trade_event_count += replay_result.trade_event_count; result.liquidity_event_count += replay_result.liquidity_event_count; result.pool_lifecycle_event_count += replay_result.pool_lifecycle_event_count; result.fee_event_count += replay_result.fee_event_count; result.reward_event_count += replay_result.reward_event_count; result.pool_admin_event_count += replay_result.pool_admin_event_count; result.pair_candle_count += replay_result.pair_candle_count; } } self.backfill_missing_token_metadata_best_effort(100).await; self.refresh_event_coverage_best_effort().await; let summary_payload = serde_json::json!({ "poolAddress": result.pool_address, "poolSignatureCount": result.pool_signature_count, "uniqueSignatureCount": result.unique_signature_count, "resolvedTransactionCount": result.resolved_transaction_count, "missingTransactionCount": result.missing_transaction_count, "transactionFetchErrorCount": result.transaction_fetch_error_count, "lastTransactionFetchError": result.last_transaction_fetch_error, "decodedEventCount": result.decoded_event_count, "detectionCount": result.detection_count, "launchAttributionCount": result.launch_attribution_count, "poolOriginCount": result.pool_origin_count, "walletParticipationCount": result.wallet_participation_count, "tradeEventCount": result.trade_event_count, "liquidityEventCount": result.liquidity_event_count, "poolLifecycleEventCount": result.pool_lifecycle_event_count, "feeEventCount": result.fee_event_count, "rewardEventCount": result.reward_event_count, "poolAdminEventCount": result.pool_admin_event_count, "pairCandleCount": result.pair_candle_count, "scannedAddressCount": addresses_to_scan.len(), "effectiveSignatureLimit": effective_limit }); let observation_result = self .persistence .record_observation(&crate::DetectionObservationInput::new( "pool.backfill.completed".to_string(), crate::ObservationSourceKind::HttpRpc, Some(format!("backfill:{}", self.http_role)), pool_address.to_string(), None, summary_payload.clone(), )) .await; let observation_id = match observation_result { Ok(observation_id) => observation_id, Err(error) => return Err(error), }; let signal_result = self .persistence .record_signal(&crate::DetectionSignalInput::new( "signal.pool.backfill.completed".to_string(), crate::AnalysisSignalSeverity::Low, pool_address.to_string(), Some(observation_id), None, summary_payload, )) .await; if let Err(error) = signal_result { return Err(error); } return Ok(result); } /// Replays one known transaction signature through the existing pipeline. pub async fn backfill_signature( &self, signature: &str, ) -> Result { let trimmed_signature = signature.trim().to_string(); if trimmed_signature.is_empty() { return Err(crate::Error::Config("signature must not be empty".to_string())); } let replay_result = self.replay_signature(trimmed_signature.clone()).await; let replay = match replay_result { Ok(replay) => replay, Err(error) => return Err(error), }; self.backfill_missing_token_metadata_best_effort(100).await; self.refresh_event_coverage_best_effort().await; let result = crate::SignatureBackfillResult { signature: trimmed_signature.clone(), resolved_transaction_count: replay.resolved_transaction_count, missing_transaction_count: replay.missing_transaction_count, transaction_fetch_error_count: replay.transaction_fetch_error_count, last_transaction_fetch_error: replay.last_transaction_fetch_error.clone(), decoded_event_count: replay.decoded_event_count, detection_count: replay.detection_count, launch_attribution_count: replay.launch_attribution_count, pool_origin_count: replay.pool_origin_count, wallet_participation_count: replay.wallet_participation_count, trade_event_count: replay.trade_event_count, liquidity_event_count: replay.liquidity_event_count, pool_lifecycle_event_count: replay.pool_lifecycle_event_count, fee_event_count: replay.fee_event_count, reward_event_count: replay.reward_event_count, pool_admin_event_count: replay.pool_admin_event_count, pair_candle_count: replay.pair_candle_count, }; let summary_payload = serde_json::json!({ "signature": result.signature.clone(), "resolvedTransactionCount": result.resolved_transaction_count, "missingTransactionCount": result.missing_transaction_count, "transactionFetchErrorCount": result.transaction_fetch_error_count, "lastTransactionFetchError": result.last_transaction_fetch_error, "decodedEventCount": result.decoded_event_count, "detectionCount": result.detection_count, "launchAttributionCount": result.launch_attribution_count, "poolOriginCount": result.pool_origin_count, "walletParticipationCount": result.wallet_participation_count, "tradeEventCount": result.trade_event_count, "liquidityEventCount": result.liquidity_event_count, "poolLifecycleEventCount": result.pool_lifecycle_event_count, "feeEventCount": result.fee_event_count, "rewardEventCount": result.reward_event_count, "poolAdminEventCount": result.pool_admin_event_count, "pairCandleCount": result.pair_candle_count }); let observation_result = self .persistence .record_observation(&crate::DetectionObservationInput::new( "signature.backfill.completed".to_string(), crate::ObservationSourceKind::HttpRpc, Some(format!("backfill:{}", self.http_role)), trimmed_signature.clone(), None, summary_payload.clone(), )) .await; let observation_id = match observation_result { Ok(observation_id) => observation_id, Err(error) => return Err(error), }; let signal_result = self .persistence .record_signal(&crate::DetectionSignalInput::new( "signal.signature.backfill.completed".to_string(), crate::AnalysisSignalSeverity::Low, trimmed_signature, Some(observation_id), None, summary_payload, )) .await; if let Err(error) = signal_result { return Err(error); } return Ok(result); } async fn fetch_transaction_value_with_retry( &self, signature: &str, config: std::option::Option, ) -> Result { let mut attempt_index = 1usize; loop { let transaction_value_result = self .http_pool .get_transaction_raw_for_role( self.http_role.as_str(), signature.to_string(), config.clone(), ) .await; match transaction_value_result { Ok(transaction_value) => return Ok(transaction_value), Err(error) => { if !token_backfill_should_retry_http_error(&error) || attempt_index >= TOKEN_BACKFILL_GET_TRANSACTION_MAX_ATTEMPTS { return Err(error); } let delay_ms = token_backfill_retry_delay_ms(attempt_index); tracing::warn!( signature = %signature, attempt = attempt_index, delay_ms = delay_ms, error = %error, "getTransaction failed during backfill; retrying" ); tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; attempt_index += 1; }, } } } async fn backfill_missing_token_metadata_best_effort(&self, limit: i64) { let metadata_result = self.token_metadata_service.backfill_missing_token_metadata(Some(limit)).await; match metadata_result { Ok(metadata_result) => { tracing::debug!( total_token_count = metadata_result.total_token_count, attempted_token_count = metadata_result.attempted_token_count, local_metadata_count = metadata_result.local_metadata_count, mint_account_metadata_count = metadata_result.mint_account_metadata_count, metaplex_metadata_count = metadata_result.metaplex_metadata_count, updated_token_count = metadata_result.updated_token_count, skipped_token_count = metadata_result.skipped_token_count, error_count = metadata_result.error_count, "token metadata backfill completed after historical replay" ); }, Err(error) => { tracing::warn!( error = %error, "token metadata backfill failed after historical replay" ); }, } } async fn refresh_event_coverage_best_effort(&self) { let coverage_service = crate::DexEventCoverageService::new(self.database.clone()); let refresh_result = coverage_service.refresh_local_counts(None).await; match refresh_result { Ok(refresh_result) => { tracing::debug!( upserted_entry_count = refresh_result.upserted_entry_count, summary_count = refresh_result.summaries.len(), "dex event coverage refreshed after historical replay" ); }, Err(error) => { tracing::warn!( error = %error, "dex event coverage refresh failed after historical replay" ); }, } } } fn token_backfill_should_retry_http_error(error: &crate::Error) -> bool { match error { crate::Error::Http(_) => return true, _ => return false, } } fn token_backfill_retry_delay_ms(attempt_index: usize) -> u64 { let multiplier = match attempt_index { 0 => 1, 1 => 1, 2 => 3, _ => 6, }; return TOKEN_BACKFILL_GET_TRANSACTION_RETRY_BASE_DELAY_MS * multiplier; } #[derive(Debug, Clone, Default)] struct TokenBackfillSignatureResult { resolved_transaction_count: usize, missing_transaction_count: usize, transaction_fetch_error_count: usize, last_transaction_fetch_error: std::option::Option, decoded_event_count: usize, detection_count: usize, launch_attribution_count: usize, pool_origin_count: usize, wallet_participation_count: usize, trade_event_count: usize, liquidity_event_count: usize, pool_lifecycle_event_count: usize, fee_event_count: usize, reward_event_count: usize, pool_admin_event_count: usize, pair_candle_count: usize, } fn merge_token_backfill_signature_result( aggregate: &mut crate::TokenBackfillResult, value: TokenBackfillSignatureResult, ) { aggregate.resolved_transaction_count += value.resolved_transaction_count; aggregate.missing_transaction_count += value.missing_transaction_count; aggregate.transaction_fetch_error_count += value.transaction_fetch_error_count; if value.last_transaction_fetch_error.is_some() { aggregate.last_transaction_fetch_error = value.last_transaction_fetch_error.clone(); } aggregate.decoded_event_count += value.decoded_event_count; aggregate.detection_count += value.detection_count; aggregate.launch_attribution_count += value.launch_attribution_count; aggregate.pool_origin_count += value.pool_origin_count; aggregate.wallet_participation_count += value.wallet_participation_count; aggregate.trade_event_count += value.trade_event_count; aggregate.liquidity_event_count += value.liquidity_event_count; aggregate.pool_lifecycle_event_count += value.pool_lifecycle_event_count; aggregate.fee_event_count += value.fee_event_count; aggregate.reward_event_count += value.reward_event_count; aggregate.pool_admin_event_count += value.pool_admin_event_count; aggregate.pair_candle_count += value.pair_candle_count; } #[cfg(test)] mod tests { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; #[derive(Debug)] struct TestBackfillHttpServer { url: std::string::String, shutdown_tx: std::option::Option>, } impl TestBackfillHttpServer { async fn spawn() -> Self { let listener_result = tokio::net::TcpListener::bind("127.0.0.1:0").await; let listener = match listener_result { Ok(listener) => listener, Err(error) => panic!("listener bind must succeed: {error}"), }; let local_addr_result = listener.local_addr(); let local_addr = match local_addr_result { Ok(local_addr) => local_addr, Err(error) => panic!("local addr must exist: {error}"), }; let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); tokio::spawn(async move { loop { tokio::select! { _ = &mut shutdown_rx => { break; } accept_result = listener.accept() => { let (mut stream, _) = match accept_result { Ok(pair) => pair, Err(_) => break, }; tokio::spawn(async move { let mut buffer = vec![0u8; 65536]; let read_result = stream.read(&mut buffer).await; let bytes_read = match read_result { Ok(bytes_read) => bytes_read, Err(_) => return, }; if bytes_read == 0 { return; } let request_text = std::string::String::from_utf8_lossy(&buffer[..bytes_read]).to_string(); let body_split = request_text.split("\r\n\r\n").collect::>(); if body_split.len() < 2 { return; } let body = body_split[1]; let request_value_result = serde_json::from_str::(body); let request_value = match request_value_result { Ok(request_value) => request_value, Err(_) => return, }; let method = request_value .get("method") .and_then(serde_json::Value::as_str) .unwrap_or_default() .to_string(); let id_value = match request_value.get("id") { Some(id_value) => id_value.clone(), None => serde_json::Value::from(1_u64), }; let response_body = if method == "getSignaturesForAddress" { let params = request_value .get("params") .and_then(serde_json::Value::as_array) .cloned() .unwrap_or_default(); let address = params .first() .and_then(serde_json::Value::as_str) .unwrap_or_default() .to_string(); if address == "BackfillToken111" { serde_json::json!({ "jsonrpc": "2.0", "result": [ { "signature": "sig-backfill-swap-1", "slot": 2002_u64, "err": null, "memo": null, "blockTime": 1779500002_i64, "confirmationStatus": "finalized" }, { "signature": "sig-backfill-create-1", "slot": 2001_u64, "err": null, "memo": null, "blockTime": 1779500001_i64, "confirmationStatus": "finalized" } ], "id": id_value }).to_string() } else if address == "BackfillPool111" { serde_json::json!({ "jsonrpc": "2.0", "result": [ { "signature": "sig-backfill-swap-1", "slot": 2002_u64, "err": null, "memo": null, "blockTime": 1779500002_i64, "confirmationStatus": "finalized" } ], "id": id_value }).to_string() } else { serde_json::json!({ "jsonrpc": "2.0", "result": [], "id": id_value }).to_string() } } else if method == "getTransaction" { let params = request_value .get("params") .and_then(serde_json::Value::as_array) .cloned() .unwrap_or_default(); let signature = params .first() .and_then(serde_json::Value::as_str) .unwrap_or_default() .to_string(); if signature == "sig-backfill-create-1" { serde_json::json!({ "jsonrpc": "2.0", "result": { "slot": 2001, "blockTime": 1779500001, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::FLUXBEAM_PROGRAM_ID, "program": "fluxbeam", "stackHeight": 1, "accounts": [ "BackfillPool111", "BackfillLpMint111", "BackfillToken111", crate::WSOL_MINT_ID, "BackfillCreator111" ], "parsed": { "info": { "instruction": "create_pool", "pool": "BackfillPool111", "lpMint": "BackfillLpMint111", "tokenA": "BackfillToken111", "tokenB": crate::WSOL_MINT_ID, "payer": "BackfillCreator111" } }, "data": "opaque" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: Instruction: CreatePool" ] } }, "id": id_value }).to_string() } else if signature == "sig-backfill-swap-1" { serde_json::json!({ "jsonrpc": "2.0", "result": { "slot": 2002, "blockTime": 1779500002, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::FLUXBEAM_PROGRAM_ID, "program": "fluxbeam", "stackHeight": 1, "accounts": [ "BackfillPool111", "BackfillLpMint111", "BackfillToken111", crate::WSOL_MINT_ID ], "parsed": { "info": { "instruction": "swap", "pool": "BackfillPool111", "tokenA": "BackfillToken111", "tokenB": crate::WSOL_MINT_ID, "baseAmountRaw": "1000", "quoteAmountRaw": "2500" } }, "data": "opaque" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: Instruction: Swap", "Program log: buy" ] } }, "id": id_value }).to_string() } else { serde_json::json!({ "jsonrpc": "2.0", "result": serde_json::Value::Null, "id": id_value }).to_string() } } else if method == "getProgramAccounts" { serde_json::json!({ "jsonrpc": "2.0", "result": [], "id": id_value }).to_string() } else { serde_json::json!({ "jsonrpc": "2.0", "result": serde_json::Value::Null, "id": id_value }).to_string() }; let response_text = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", response_body.len(), response_body ); let write_result = stream.write_all(response_text.as_bytes()).await; if write_result.is_err() { return; } let _ = stream.shutdown().await; }); } } } }); return Self { url: format!("http://{}", local_addr), shutdown_tx: Some(shutdown_tx), }; } async fn shutdown(mut self) { let shutdown_tx_option = self.shutdown_tx.take(); if let Some(shutdown_tx) = shutdown_tx_option { let _ = shutdown_tx.send(()); } } } async fn make_database() -> std::sync::Arc { let tempdir_result = tempfile::tempdir(); let tempdir = match tempdir_result { Ok(tempdir) => tempdir, Err(error) => panic!("tempdir must succeed: {}", error), }; let database_path = tempdir.path().join("token_backfill.sqlite3"); let config = crate::DatabaseConfig { enabled: true, backend: crate::DatabaseBackend::Sqlite, sqlite: crate::SqliteDatabaseConfig { path: database_path.to_string_lossy().to_string(), create_if_missing: true, busy_timeout_ms: 5000, max_connections: 1, auto_initialize_schema: true, use_wal: true, }, }; let database_result = crate::Database::connect_and_initialize(&config).await; let database = match database_result { Ok(database) => database, Err(error) => panic!("database init must succeed: {}", error), }; return std::sync::Arc::new(database); } fn make_http_endpoint_config(url: std::string::String) -> crate::HttpEndpointConfig { return crate::HttpEndpointConfig { name: "backfill_http".to_string(), enabled: true, provider: "test".to_string(), url, api_key_env_var: None, roles: vec!["history_backfill".to_string()], requests_per_second: 100, burst_capacity: 100, send_transaction_requests_per_second: None, send_transaction_burst_capacity: None, heavy_requests_per_second: None, heavy_burst_capacity: None, connect_timeout_ms: 5_000, request_timeout_ms: 5_000, max_idle_connections_per_host: 8, pause_after_http_429_ms: None, max_concurrent_requests_per_endpoint: 16, }; } #[tokio::test] async fn backfill_token_by_mint_reconstructs_pool_and_trade() { let server = TestBackfillHttpServer::spawn().await; let database = make_database().await; let pool_result = crate::HttpEndpointPool::from_endpoint_configs(vec![make_http_endpoint_config( server.url.clone(), )]); let http_pool = match pool_result { Ok(http_pool) => std::sync::Arc::new(http_pool), Err(error) => panic!("http pool creation must succeed: {}", error), }; let service = crate::TokenBackfillService::new( http_pool, database.clone(), "history_backfill".to_string(), ); let backfill_result = service.backfill_token_by_mint("BackfillToken111", 20, 20).await; let backfill = match backfill_result { Ok(backfill) => backfill, Err(error) => panic!("backfill must succeed: {}", error), }; assert_eq!(backfill.mint_signature_count, 2); assert_eq!(backfill.pool_address_count, 1); assert_eq!(backfill.pool_signature_count, 1); assert_eq!(backfill.unique_signature_count, 2); assert_eq!(backfill.resolved_transaction_count, 2); assert_eq!(backfill.missing_transaction_count, 0); assert_eq!(backfill.trade_event_count, 1); assert!(backfill.pair_candle_count > 0); let token_result = crate::query_tokens_get_by_mint(database.as_ref(), "BackfillToken111").await; let token_option = match token_result { Ok(token_option) => token_option, Err(error) => panic!("token fetch must succeed: {}", error), }; let token = match token_option { Some(token) => token, None => panic!("token must exist"), }; assert!(token.id.is_some()); let pool_result = crate::query_pools_get_by_address(database.as_ref(), "BackfillPool111").await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => panic!("pool fetch must succeed: {}", error), }; let pool = match pool_option { Some(pool) => pool, None => panic!("pool must exist"), }; let pool_id = match pool.id { Some(pool_id) => pool_id, None => panic!("pool must have an id"), }; let pair_result = crate::query_pairs_get_by_pool_id(database.as_ref(), pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => panic!("pair fetch must succeed: {}", error), }; let pair = match pair_option { Some(pair) => pair, None => panic!("pair must exist"), }; let pair_id = match pair.id { Some(pair_id) => pair_id, None => panic!("pair must have an id"), }; let trade_events_result = crate::query_trade_events_list_by_pair_id(database.as_ref(), pair_id).await; let trade_events = match trade_events_result { Ok(trade_events) => trade_events, Err(error) => panic!("trade event list must succeed: {}", error), }; assert_eq!(trade_events.len(), 1); assert_eq!(trade_events[0].price_quote_per_base, Some(2.5)); let pair_metric_result = crate::query_pair_metrics_get_by_pair_id(database.as_ref(), pair_id).await; let pair_metric_option = match pair_metric_result { Ok(pair_metric_option) => pair_metric_option, Err(error) => panic!("pair metric fetch must succeed: {}", error), }; let pair_metric = match pair_metric_option { Some(pair_metric) => pair_metric, None => panic!("pair metric must exist"), }; assert_eq!(pair_metric.trade_count, 1); assert_eq!(pair_metric.buy_count, 1); let candles_result = crate::query_pair_candles_list_by_pair_and_timeframe(database.as_ref(), pair_id, 60) .await; let candles = match candles_result { Ok(candles) => candles, Err(error) => panic!("pair candle list must succeed: {}", error), }; assert_eq!(candles.len(), 1); assert_eq!(candles[0].trade_count, 1); assert_eq!(candles[0].close_price_quote_per_base, 2.5); server.shutdown().await; } #[tokio::test] async fn backfill_token_by_mint_is_state_idempotent() { let server = TestBackfillHttpServer::spawn().await; let database = make_database().await; let pool_result = crate::HttpEndpointPool::from_endpoint_configs(vec![make_http_endpoint_config( server.url.clone(), )]); let http_pool = match pool_result { Ok(http_pool) => std::sync::Arc::new(http_pool), Err(error) => panic!("http pool creation must succeed: {}", error), }; let service = crate::TokenBackfillService::new( http_pool, database.clone(), "history_backfill".to_string(), ); let first_result = service.backfill_token_by_mint("BackfillToken111", 20, 20).await; if let Err(error) = first_result { panic!("first backfill must succeed: {}", error); } let second_result = service.backfill_token_by_mint("BackfillToken111", 20, 20).await; if let Err(error) = second_result { panic!("second backfill must succeed: {}", error); } let token_result = crate::query_tokens_get_by_mint(database.as_ref(), "BackfillToken111").await; let token_option = match token_result { Ok(token_option) => token_option, Err(error) => panic!("token fetch must succeed: {}", error), }; let token = match token_option { Some(token) => token, None => panic!("token must exist"), }; let token_id = token.id.unwrap_or_default(); let pools_result = crate::query_pools_list(database.as_ref()).await; let pools = match pools_result { Ok(pools) => pools, Err(error) => panic!("pool list must succeed: {}", error), }; assert_eq!(pools.len(), 1); let pool_id = pools[0].id.unwrap_or_default(); let pool_tokens_result = crate::query_pool_tokens_list_by_pool_id(database.as_ref(), pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => panic!("pool token list must succeed: {}", error), }; let mut found_token = false; for pool_token in pool_tokens { if pool_token.token_id == token_id { found_token = true; } } assert!(found_token); let pair_result = crate::query_pairs_get_by_pool_id(database.as_ref(), pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => panic!("pair fetch must succeed: {}", error), }; let pair = match pair_option { Some(pair) => pair, None => panic!("pair must exist"), }; let pair_id = pair.id.unwrap_or_default(); let trade_events_result = crate::query_trade_events_list_by_pair_id(database.as_ref(), pair_id).await; let trade_events = match trade_events_result { Ok(trade_events) => trade_events, Err(error) => panic!("trade event list must succeed: {}", error), }; assert_eq!(trade_events.len(), 1); let pair_metrics_result = crate::query_pair_metrics_list(database.as_ref()).await; let pair_metrics = match pair_metrics_result { Ok(pair_metrics) => pair_metrics, Err(error) => panic!("pair metric list must succeed: {}", error), }; assert_eq!(pair_metrics.len(), 1); server.shutdown().await; } }