diff --git a/CHANGELOG.md b/CHANGELOG.md index 37802d3..3bcef4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,3 +48,4 @@ 0.7.15 - Ajout d’une couche wallets observés et participations observées, avec extraction des rôles depuis les payloads décodés et rattachement transaction / decoded event / pool / pair 0.7.16 - Ajout d’une première couche trade events et pair metrics avec normalisation des swaps, agrégation par paire et branchement automatique dans le pipeline de résolution transactionnelle 0.7.17 - Ajout d’une première couche WS hybride avec collecte de cibles `programSubscribe` / `accountSubscribe` et persistance technique dédupliquée des notifications `logs / program / account` +0.7.18 - Ajout d’un premier backfill historique ciblé par token mint, basé sur `getSignaturesForAddress` + `getTransaction`, avec réutilisation du pipeline interne pour reconstruire transactions, pools, swaps, origins, wallets et métriques diff --git a/Cargo.toml b/Cargo.toml index 48d6c40..0b4d920 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.7.17" +version = "0.7.18" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index 6f0401d..16f2bdc 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -625,17 +625,14 @@ Réalisé : - ajout d’une façade runtime pour exposer ce comportement au futur branchement `ws_manager`. ### 6.050. Version `0.7.18` — Backfill historique ciblé par token -Objectif : permettre la reconstruction historique ciblée d’un token encore actif non observé en live. +Réalisé : -À faire : - -- ajouter une entrée `token_mint -> backfill`, -- retrouver les signatures historiques via `getSignaturesForAddress`, -- résoudre les transactions pertinentes via `getTransaction`, -- retrouver les pools et paires liés au token, -- reconstruire les swaps observables et les métriques dérivées, -- cibler prioritairement des tokens encore actifs comme `USDC`, `USDT`, `RAY`, `JUP`, -- éviter un scan exhaustif de toute la blockchain. +- ajout d’un premier service de backfill ciblé par `token_mint`, +- récupération des signatures historiques via `getSignaturesForAddress`, +- résolution des transactions pertinentes via `getTransaction`, +- relecture du pipeline interne pour reconstruire transactions, décodage DEX, détection métier, origins, wallets et trade metrics, +- ajout d’une seconde passe sur les pools découverts pour le token afin de récupérer des signatures supplémentaires liées à l’activité du pool, +- conservation d’un périmètre ciblé sur des tokens encore actifs au lieu d’un scan exhaustif de la blockchain. ### 6.051. Version `0.7.19` — Holdings observés Objectif : compléter la couche acteurs observés avec une première vision des balances et comptes token observés, sans viser encore un portefeuille temps réel exhaustif. diff --git a/kb_lib/src/http_pool.rs b/kb_lib/src/http_pool.rs index 4eb53cf..ab99eb4 100644 --- a/kb_lib/src/http_pool.rs +++ b/kb_lib/src/http_pool.rs @@ -278,6 +278,60 @@ impl HttpEndpointPool { }; client.get_transaction_raw(signature, config).await } + + /// Executes `getProgramAccounts` through the pool and returns the raw result value. + pub async fn get_program_accounts_raw_for_role( + &self, + required_role: &str, + program_id: std::string::String, + config: std::option::Option, + ) -> Result { + let client_result = self + .select_client_for_role_and_method(required_role, "getProgramAccounts") + .await; + let client = match client_result { + Ok(client) => client, + Err(error) => return Err(error), + }; + client.get_program_accounts_raw(program_id, config).await + } + + /// Executes `getSignaturesForAddress` through the pool and returns the raw result value. + pub async fn get_signatures_for_address_raw_for_role( + &self, + required_role: &str, + address: std::string::String, + config: std::option::Option, + ) -> Result { + let client_result = self + .select_client_for_role_and_method(required_role, "getSignaturesForAddress") + .await; + let client = match client_result { + Ok(client) => client, + Err(error) => return Err(error), + }; + client.get_signatures_for_address_raw(address, config).await + } + + /// Executes typed `getSignaturesForAddress` through the pool. + pub async fn get_signatures_for_address_for_role( + &self, + required_role: &str, + address: std::string::String, + config: std::option::Option, + ) -> Result< + std::vec::Vec, + crate::KbError, + > { + let client_result = self + .select_client_for_role_and_method(required_role, "getSignaturesForAddress") + .await; + let client = match client_result { + Ok(client) => client, + Err(error) => return Err(error), + }; + client.get_signatures_for_address(address, config).await + } } fn kb_pool_build_optional_config_only_params( diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index 36398ac..82045f8 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -22,6 +22,7 @@ mod json_rpc_ws; mod launch_origin; mod pool_origin; mod solana_pubsub_ws; +mod token_backfill; mod tracing; mod trade_aggregation; mod tx_model; @@ -280,6 +281,8 @@ pub use pool_origin::KbPoolOriginService; pub use solana_pubsub_ws::KbSolanaWsTypedNotification; pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification; pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event; +pub use token_backfill::KbTokenBackfillResult; +pub use token_backfill::KbTokenBackfillService; pub use tracing::KbTracingGuard; pub use tracing::init_tracing; pub use trade_aggregation::KbTradeAggregationResult; diff --git a/kb_lib/src/token_backfill.rs b/kb_lib/src/token_backfill.rs new file mode 100644 index 0000000..82ad245 --- /dev/null +++ b/kb_lib/src/token_backfill.rs @@ -0,0 +1,916 @@ +// file: kb_lib/src/token_backfill.rs + +//! Historical token backfill service. + +/// One token-backfill result summary. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbTokenBackfillResult { + /// Input token mint. + pub token_mint: std::string::String, + /// Number of signatures returned directly for the mint. + pub mint_signature_count: usize, + /// Number of pool addresses discovered for the token after the first replay pass. + pub pool_address_count: usize, + /// Number of signatures returned from those pool addresses. + pub pool_signature_count: usize, + /// Number of unique signatures processed during this run. + pub unique_signature_count: usize, + /// Number of transactions resolved through HTTP during this run. + pub resolved_transaction_count: usize, + /// Number of signatures whose `getTransaction` lookup returned `null`. + pub missing_transaction_count: usize, + /// Total number of decoded DEX events replayed during this run. + pub decoded_event_count: usize, + /// Total number of DEX detection results produced during this run. + pub detection_count: usize, + /// Total number of launch-attribution results produced during this run. + pub launch_attribution_count: usize, + /// Total number of pool-origin results produced during this run. + pub pool_origin_count: usize, + /// Total number of wallet-participation observations produced during this run. + pub wallet_participation_count: usize, + /// Total number of trade-aggregation results produced during this run. + pub trade_event_count: usize, +} + +/// Historical token backfill service. +/// +/// This service reuses the existing transaction projection and downstream +/// DEX pipeline instead of introducing a separate historical code path. +#[derive(Debug, Clone)] +pub struct KbTokenBackfillService { + http_pool: std::sync::Arc, + database: std::sync::Arc, + persistence: crate::KbDetectionPersistenceService, + http_role: std::string::String, + transaction_model: crate::KbTransactionModelService, + dex_decode_service: crate::KbDexDecodeService, + dex_detect_service: crate::KbDexDetectService, + launch_origin_service: crate::KbLaunchOriginService, + pool_origin_service: crate::KbPoolOriginService, + wallet_observation_service: crate::KbWalletObservationService, + trade_aggregation_service: crate::KbTradeAggregationService, +} + +impl KbTokenBackfillService { + /// Creates a new token-backfill service. + pub fn new( + http_pool: std::sync::Arc, + database: std::sync::Arc, + http_role: std::string::String, + ) -> Self { + let persistence = crate::KbDetectionPersistenceService::new(database.clone()); + let transaction_model = crate::KbTransactionModelService::new(database.clone()); + let dex_decode_service = crate::KbDexDecodeService::new(database.clone()); + let dex_detect_service = crate::KbDexDetectService::new(database.clone()); + let launch_origin_service = crate::KbLaunchOriginService::new(database.clone()); + let pool_origin_service = crate::KbPoolOriginService::new(database.clone()); + let wallet_observation_service = crate::KbWalletObservationService::new(database.clone()); + let trade_aggregation_service = crate::KbTradeAggregationService::new(database.clone()); + Self { + http_pool, + database, + persistence, + http_role, + transaction_model, + dex_decode_service, + dex_detect_service, + launch_origin_service, + pool_origin_service, + wallet_observation_service, + trade_aggregation_service, + } + } + + /// Replays the historical activity of one token mint through the existing pipeline. + pub async fn backfill_token_by_mint( + &self, + token_mint: &str, + mint_signature_limit: usize, + pool_signature_limit: usize, + ) -> Result { + let mut result = crate::KbTokenBackfillResult { + token_mint: token_mint.to_string(), + mint_signature_count: 0, + pool_address_count: 0, + pool_signature_count: 0, + unique_signature_count: 0, + resolved_transaction_count: 0, + missing_transaction_count: 0, + decoded_event_count: 0, + detection_count: 0, + launch_attribution_count: 0, + pool_origin_count: 0, + wallet_participation_count: 0, + trade_event_count: 0, + }; + let mut seen_signatures = std::collections::HashSet::::new(); + let mint_signatures_result = self + .fetch_signatures_for_address(token_mint.to_string(), mint_signature_limit) + .await; + let mut mint_signatures = match mint_signatures_result { + Ok(mint_signatures) => mint_signatures, + Err(error) => return Err(error), + }; + result.mint_signature_count = mint_signatures.len(); + mint_signatures.reverse(); + for signature_status in mint_signatures { + let signature = signature_status.signature.clone(); + if seen_signatures.contains(signature.as_str()) { + continue; + } + seen_signatures.insert(signature.clone()); + result.unique_signature_count += 1; + let replay_result = self.replay_signature(signature).await; + let replay_result = match replay_result { + Ok(replay_result) => replay_result, + Err(error) => return Err(error), + }; + kb_merge_token_backfill_signature_result(&mut result, replay_result); + } + let pool_addresses_result = self.collect_pool_addresses_for_token_mint(token_mint).await; + let pool_addresses = match pool_addresses_result { + Ok(pool_addresses) => pool_addresses, + Err(error) => return Err(error), + }; + result.pool_address_count = pool_addresses.len(); + for pool_address in pool_addresses { + let pool_signatures_result = self + .fetch_signatures_for_address(pool_address.clone(), pool_signature_limit) + .await; + let mut pool_signatures = match pool_signatures_result { + Ok(pool_signatures) => pool_signatures, + Err(error) => return Err(error), + }; + result.pool_signature_count += pool_signatures.len(); + pool_signatures.reverse(); + for signature_status in pool_signatures { + let signature = signature_status.signature.clone(); + if seen_signatures.contains(signature.as_str()) { + continue; + } + seen_signatures.insert(signature.clone()); + result.unique_signature_count += 1; + let replay_result = self.replay_signature(signature).await; + let replay_result = match replay_result { + Ok(replay_result) => replay_result, + Err(error) => return Err(error), + }; + kb_merge_token_backfill_signature_result(&mut result, replay_result); + } + } + let summary_payload = serde_json::json!({ + "tokenMint": result.token_mint, + "mintSignatureCount": result.mint_signature_count, + "poolAddressCount": result.pool_address_count, + "poolSignatureCount": result.pool_signature_count, + "uniqueSignatureCount": result.unique_signature_count, + "resolvedTransactionCount": result.resolved_transaction_count, + "missingTransactionCount": result.missing_transaction_count, + "decodedEventCount": result.decoded_event_count, + "detectionCount": result.detection_count, + "launchAttributionCount": result.launch_attribution_count, + "poolOriginCount": result.pool_origin_count, + "walletParticipationCount": result.wallet_participation_count, + "tradeEventCount": result.trade_event_count + }); + let observation_result = self + .persistence + .record_observation(&crate::KbDetectionObservationInput::new( + "token.backfill.completed".to_string(), + crate::KbObservationSourceKind::HttpRpc, + Some(format!("backfill:{}", self.http_role)), + token_mint.to_string(), + None, + summary_payload.clone(), + )) + .await; + let observation_id = match observation_result { + Ok(observation_id) => observation_id, + Err(error) => return Err(error), + }; + let signal_result = self + .persistence + .record_signal(&crate::KbDetectionSignalInput::new( + "signal.token.backfill.completed".to_string(), + crate::KbAnalysisSignalSeverity::Low, + token_mint.to_string(), + Some(observation_id), + None, + summary_payload, + )) + .await; + if let Err(error) = signal_result { + return Err(error); + } + Ok(result) + } + + async fn fetch_signatures_for_address( + &self, + address: std::string::String, + limit: usize, + ) -> Result< + std::vec::Vec, + crate::KbError, + > { + let config = solana_rpc_client_api::config::RpcSignaturesForAddressConfig { + before: None, + until: None, + limit: Some(limit), + commitment: None, + min_context_slot: None, + }; + self.http_pool + .get_signatures_for_address_for_role(self.http_role.as_str(), address, Some(config)) + .await + } + + async fn collect_pool_addresses_for_token_mint( + &self, + token_mint: &str, + ) -> Result, crate::KbError> { + let token_result = crate::get_token_by_mint(self.database.as_ref(), token_mint).await; + let token_option = match token_result { + Ok(token_option) => token_option, + Err(error) => return Err(error), + }; + let token = match token_option { + Some(token) => token, + None => return Ok(std::vec::Vec::new()), + }; + let token_id = match token.id { + Some(token_id) => token_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "token '{}' has no internal id", + token.mint + ))); + } + }; + let pools_result = crate::list_pools(self.database.as_ref()).await; + let pools = match pools_result { + Ok(pools) => pools, + Err(error) => return Err(error), + }; + let mut addresses = std::vec::Vec::new(); + let mut seen = std::collections::HashSet::::new(); + for pool in pools { + let pool_id = match pool.id { + Some(pool_id) => pool_id, + None => continue, + }; + let pool_tokens_result = + crate::list_pool_tokens_by_pool_id(self.database.as_ref(), pool_id).await; + let pool_tokens = match pool_tokens_result { + Ok(pool_tokens) => pool_tokens, + Err(error) => return Err(error), + }; + let mut contains_token = false; + for pool_token in pool_tokens { + if pool_token.token_id == token_id { + contains_token = true; + break; + } + } + if contains_token && !seen.contains(pool.address.as_str()) { + seen.insert(pool.address.clone()); + addresses.push(pool.address.clone()); + } + } + addresses.sort(); + Ok(addresses) + } + + async fn replay_signature( + &self, + signature: std::string::String, + ) -> Result { + let config = Some(serde_json::json!({ + "encoding": "jsonParsed", + "maxSupportedTransactionVersion": 0 + })); + let transaction_value_result = self + .http_pool + .get_transaction_raw_for_role(self.http_role.as_str(), signature.clone(), config) + .await; + let transaction_value = match transaction_value_result { + Ok(transaction_value) => transaction_value, + Err(error) => return Err(error), + }; + if transaction_value.is_null() { + return Ok(KbTokenBackfillSignatureResult { + resolved_transaction_count: 0, + missing_transaction_count: 1, + decoded_event_count: 0, + detection_count: 0, + launch_attribution_count: 0, + pool_origin_count: 0, + wallet_participation_count: 0, + trade_event_count: 0, + }); + } + let existing_transaction_result = crate::get_chain_transaction_by_signature( + self.database.as_ref(), + signature.as_str(), + ) + .await; + let existing_transaction_option = match existing_transaction_result { + Ok(existing_transaction_option) => existing_transaction_option, + Err(error) => return Err(error), + }; + if existing_transaction_option.is_none() { + let persist_result = self + .transaction_model + .persist_resolved_transaction( + signature.as_str(), + Some(format!("backfill:{}", self.http_role)), + &transaction_value, + ) + .await; + if let Err(error) = persist_result { + return Err(error); + } + } + let decoded_result = self + .dex_decode_service + .decode_transaction_by_signature(signature.as_str()) + .await; + let decoded = match decoded_result { + Ok(decoded) => decoded, + Err(error) => return Err(error), + }; + let detections_result = self + .dex_detect_service + .detect_transaction_by_signature(signature.as_str()) + .await; + let detections = match detections_result { + Ok(detections) => detections, + Err(error) => return Err(error), + }; + let launch_attributions_result = self + .launch_origin_service + .attribute_transaction_by_signature(signature.as_str()) + .await; + let launch_attributions = match launch_attributions_result { + Ok(launch_attributions) => launch_attributions, + Err(error) => return Err(error), + }; + let pool_origins_result = self + .pool_origin_service + .record_transaction_by_signature(signature.as_str()) + .await; + let pool_origins = match pool_origins_result { + Ok(pool_origins) => pool_origins, + Err(error) => return Err(error), + }; + let wallet_observations_result = self + .wallet_observation_service + .record_transaction_by_signature(signature.as_str()) + .await; + let wallet_observations = match wallet_observations_result { + Ok(wallet_observations) => wallet_observations, + Err(error) => return Err(error), + }; + let trade_aggregations_result = self + .trade_aggregation_service + .record_transaction_by_signature(signature.as_str()) + .await; + let trade_aggregations = match trade_aggregations_result { + Ok(trade_aggregations) => trade_aggregations, + Err(error) => return Err(error), + }; + Ok(KbTokenBackfillSignatureResult { + resolved_transaction_count: 1, + missing_transaction_count: 0, + decoded_event_count: decoded.len(), + detection_count: detections.len(), + launch_attribution_count: launch_attributions.len(), + pool_origin_count: pool_origins.len(), + wallet_participation_count: wallet_observations.len(), + trade_event_count: trade_aggregations.len(), + }) + } +} + +#[derive(Debug, Clone, Default)] +struct KbTokenBackfillSignatureResult { + resolved_transaction_count: usize, + missing_transaction_count: usize, + decoded_event_count: usize, + detection_count: usize, + launch_attribution_count: usize, + pool_origin_count: usize, + wallet_participation_count: usize, + trade_event_count: usize, +} + +fn kb_merge_token_backfill_signature_result( + aggregate: &mut crate::KbTokenBackfillResult, + value: KbTokenBackfillSignatureResult, +) { + aggregate.resolved_transaction_count += value.resolved_transaction_count; + aggregate.missing_transaction_count += value.missing_transaction_count; + aggregate.decoded_event_count += value.decoded_event_count; + aggregate.detection_count += value.detection_count; + aggregate.launch_attribution_count += value.launch_attribution_count; + aggregate.pool_origin_count += value.pool_origin_count; + aggregate.wallet_participation_count += value.wallet_participation_count; + aggregate.trade_event_count += value.trade_event_count; +} + +#[cfg(test)] +mod tests { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + #[derive(Debug)] + struct TestBackfillHttpServer { + url: std::string::String, + shutdown_tx: std::option::Option>, + } + + impl TestBackfillHttpServer { + async fn spawn() -> Self { + let listener_result = tokio::net::TcpListener::bind("127.0.0.1:0").await; + let listener = match listener_result { + Ok(listener) => listener, + Err(error) => panic!("listener bind must succeed: {error}"), + }; + let local_addr_result = listener.local_addr(); + let local_addr = match local_addr_result { + Ok(local_addr) => local_addr, + Err(error) => panic!("local addr must exist: {error}"), + }; + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut shutdown_rx => { + break; + } + accept_result = listener.accept() => { + let (mut stream, _) = match accept_result { + Ok(pair) => pair, + Err(_) => break, + }; + tokio::spawn(async move { + let mut buffer = vec![0u8; 65536]; + let read_result = stream.read(&mut buffer).await; + let bytes_read = match read_result { + Ok(bytes_read) => bytes_read, + Err(_) => return, + }; + if bytes_read == 0 { + return; + } + let request_text = std::string::String::from_utf8_lossy(&buffer[..bytes_read]).to_string(); + let body_split = request_text.split("\r\n\r\n").collect::>(); + if body_split.len() < 2 { + return; + } + let body = body_split[1]; + let request_value_result = serde_json::from_str::(body); + let request_value = match request_value_result { + Ok(request_value) => request_value, + Err(_) => return, + }; + let method = request_value + .get("method") + .and_then(serde_json::Value::as_str) + .unwrap_or_default() + .to_string(); + let id_value = match request_value.get("id") { + Some(id_value) => id_value.clone(), + None => serde_json::Value::from(1_u64), + }; + let response_body = if method == "getSignaturesForAddress" { + let params = request_value + .get("params") + .and_then(serde_json::Value::as_array) + .cloned() + .unwrap_or_default(); + let address = params + .first() + .and_then(serde_json::Value::as_str) + .unwrap_or_default() + .to_string(); + if address == "BackfillToken111" { + serde_json::json!({ + "jsonrpc": "2.0", + "result": [ + { + "signature": "sig-backfill-swap-1", + "slot": 2002_u64, + "err": null, + "memo": null, + "blockTime": 1779500002_i64, + "confirmationStatus": "finalized" + }, + { + "signature": "sig-backfill-create-1", + "slot": 2001_u64, + "err": null, + "memo": null, + "blockTime": 1779500001_i64, + "confirmationStatus": "finalized" + } + ], + "id": id_value + }).to_string() + } else if address == "BackfillPool111" { + serde_json::json!({ + "jsonrpc": "2.0", + "result": [ + { + "signature": "sig-backfill-swap-1", + "slot": 2002_u64, + "err": null, + "memo": null, + "blockTime": 1779500002_i64, + "confirmationStatus": "finalized" + } + ], + "id": id_value + }).to_string() + } else { + serde_json::json!({ + "jsonrpc": "2.0", + "result": [], + "id": id_value + }).to_string() + } + } else if method == "getTransaction" { + let params = request_value + .get("params") + .and_then(serde_json::Value::as_array) + .cloned() + .unwrap_or_default(); + let signature = params + .first() + .and_then(serde_json::Value::as_str) + .unwrap_or_default() + .to_string(); + if signature == "sig-backfill-create-1" { + serde_json::json!({ + "jsonrpc": "2.0", + "result": { + "slot": 2001, + "blockTime": 1779500001, + "version": 0, + "transaction": { + "message": { + "instructions": [ + { + "programId": crate::KB_FLUXBEAM_PROGRAM_ID, + "program": "fluxbeam", + "stackHeight": 1, + "accounts": [ + "BackfillPool111", + "BackfillLpMint111", + "BackfillToken111", + "So11111111111111111111111111111111111111112", + "BackfillCreator111" + ], + "parsed": { + "info": { + "instruction": "create_pool", + "pool": "BackfillPool111", + "lpMint": "BackfillLpMint111", + "tokenA": "BackfillToken111", + "tokenB": "So11111111111111111111111111111111111111112", + "payer": "BackfillCreator111" + } + }, + "data": "opaque" + } + ] + } + }, + "meta": { + "err": null, + "logMessages": [ + "Program log: Instruction: CreatePool" + ] + } + }, + "id": id_value + }).to_string() + } else if signature == "sig-backfill-swap-1" { + serde_json::json!({ + "jsonrpc": "2.0", + "result": { + "slot": 2002, + "blockTime": 1779500002, + "version": 0, + "transaction": { + "message": { + "instructions": [ + { + "programId": crate::KB_FLUXBEAM_PROGRAM_ID, + "program": "fluxbeam", + "stackHeight": 1, + "accounts": [ + "BackfillPool111", + "BackfillLpMint111", + "BackfillToken111", + "So11111111111111111111111111111111111111112" + ], + "parsed": { + "info": { + "instruction": "swap", + "pool": "BackfillPool111", + "tokenA": "BackfillToken111", + "tokenB": "So11111111111111111111111111111111111111112", + "baseAmountRaw": "1000", + "quoteAmountRaw": "2500" + } + }, + "data": "opaque" + } + ] + } + }, + "meta": { + "err": null, + "logMessages": [ + "Program log: Instruction: Swap", + "Program log: buy" + ] + } + }, + "id": id_value + }).to_string() + } else { + serde_json::json!({ + "jsonrpc": "2.0", + "result": serde_json::Value::Null, + "id": id_value + }).to_string() + } + } else if method == "getProgramAccounts" { + serde_json::json!({ + "jsonrpc": "2.0", + "result": [], + "id": id_value + }).to_string() + } else { + serde_json::json!({ + "jsonrpc": "2.0", + "result": serde_json::Value::Null, + "id": id_value + }).to_string() + }; + let response_text = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + response_body.len(), + response_body + ); + let write_result = stream.write_all(response_text.as_bytes()).await; + if write_result.is_err() { + return; + } + let _ = stream.shutdown().await; + }); + } + } + } + }); + Self { + url: format!("http://{}", local_addr), + shutdown_tx: Some(shutdown_tx), + } + } + async fn shutdown(mut self) { + let shutdown_tx_option = self.shutdown_tx.take(); + if let Some(shutdown_tx) = shutdown_tx_option { + let _ = shutdown_tx.send(()); + } + } + } + + async fn make_database() -> std::sync::Arc { + let tempdir_result = tempfile::tempdir(); + let tempdir = match tempdir_result { + Ok(tempdir) => tempdir, + Err(error) => panic!("tempdir must succeed: {}", error), + }; + let database_path = tempdir.path().join("token_backfill.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database_result = crate::KbDatabase::connect_and_initialize(&config).await; + let database = match database_result { + Ok(database) => database, + Err(error) => panic!("database init must succeed: {}", error), + }; + std::sync::Arc::new(database) + } + + fn make_http_endpoint_config(url: std::string::String) -> crate::KbHttpEndpointConfig { + crate::KbHttpEndpointConfig { + name: "backfill_http".to_string(), + enabled: true, + provider: "test".to_string(), + url, + api_key_env_var: None, + roles: vec!["history_backfill".to_string()], + requests_per_second: 100, + burst_capacity: 100, + send_transaction_requests_per_second: None, + send_transaction_burst_capacity: None, + heavy_requests_per_second: None, + heavy_burst_capacity: None, + connect_timeout_ms: 5_000, + request_timeout_ms: 5_000, + max_idle_connections_per_host: 8, + pause_after_http_429_ms: None, + max_concurrent_requests_per_endpoint: 16, + } + } + + #[tokio::test] + async fn backfill_token_by_mint_reconstructs_pool_and_trade() { + let server = TestBackfillHttpServer::spawn().await; + let database = make_database().await; + let pool_result = + crate::HttpEndpointPool::from_endpoint_configs(vec![make_http_endpoint_config( + server.url.clone(), + )]); + let http_pool = match pool_result { + Ok(http_pool) => std::sync::Arc::new(http_pool), + Err(error) => panic!("http pool creation must succeed: {}", error), + }; + let service = crate::KbTokenBackfillService::new( + http_pool, + database.clone(), + "history_backfill".to_string(), + ); + let backfill_result = service + .backfill_token_by_mint("BackfillToken111", 20, 20) + .await; + let backfill = match backfill_result { + Ok(backfill) => backfill, + Err(error) => panic!("backfill must succeed: {}", error), + }; + assert_eq!(backfill.mint_signature_count, 2); + assert_eq!(backfill.pool_address_count, 1); + assert_eq!(backfill.pool_signature_count, 1); + assert_eq!(backfill.unique_signature_count, 2); + assert_eq!(backfill.resolved_transaction_count, 2); + assert_eq!(backfill.missing_transaction_count, 0); + assert_eq!(backfill.trade_event_count, 1); + let token_result = crate::get_token_by_mint(database.as_ref(), "BackfillToken111").await; + let token_option = match token_result { + Ok(token_option) => token_option, + Err(error) => panic!("token fetch must succeed: {}", error), + }; + let token = match token_option { + Some(token) => token, + None => panic!("token must exist"), + }; + assert!(token.id.is_some()); + let pool_result = crate::get_pool_by_address(database.as_ref(), "BackfillPool111").await; + let pool_option = match pool_result { + Ok(pool_option) => pool_option, + Err(error) => panic!("pool fetch must succeed: {}", error), + }; + let pool = match pool_option { + Some(pool) => pool, + None => panic!("pool must exist"), + }; + let pool_id = match pool.id { + Some(pool_id) => pool_id, + None => panic!("pool must have an id"), + }; + let pair_result = crate::get_pair_by_pool_id(database.as_ref(), pool_id).await; + let pair_option = match pair_result { + Ok(pair_option) => pair_option, + Err(error) => panic!("pair fetch must succeed: {}", error), + }; + let pair = match pair_option { + Some(pair) => pair, + None => panic!("pair must exist"), + }; + let pair_id = match pair.id { + Some(pair_id) => pair_id, + None => panic!("pair must have an id"), + }; + let trade_events_result = + crate::list_trade_events_by_pair_id(database.as_ref(), pair_id).await; + let trade_events = match trade_events_result { + Ok(trade_events) => trade_events, + Err(error) => panic!("trade event list must succeed: {}", error), + }; + assert_eq!(trade_events.len(), 1); + assert_eq!(trade_events[0].price_quote_per_base, Some(2.5)); + let pair_metric_result = + crate::get_pair_metric_by_pair_id(database.as_ref(), pair_id).await; + let pair_metric_option = match pair_metric_result { + Ok(pair_metric_option) => pair_metric_option, + Err(error) => panic!("pair metric fetch must succeed: {}", error), + }; + let pair_metric = match pair_metric_option { + Some(pair_metric) => pair_metric, + None => panic!("pair metric must exist"), + }; + assert_eq!(pair_metric.trade_count, 1); + assert_eq!(pair_metric.buy_count, 1); + server.shutdown().await; + } + + #[tokio::test] + async fn backfill_token_by_mint_is_state_idempotent() { + let server = TestBackfillHttpServer::spawn().await; + let database = make_database().await; + let pool_result = + crate::HttpEndpointPool::from_endpoint_configs(vec![make_http_endpoint_config( + server.url.clone(), + )]); + let http_pool = match pool_result { + Ok(http_pool) => std::sync::Arc::new(http_pool), + Err(error) => panic!("http pool creation must succeed: {}", error), + }; + let service = crate::KbTokenBackfillService::new( + http_pool, + database.clone(), + "history_backfill".to_string(), + ); + let first_result = service + .backfill_token_by_mint("BackfillToken111", 20, 20) + .await; + if let Err(error) = first_result { + panic!("first backfill must succeed: {}", error); + } + + let second_result = service + .backfill_token_by_mint("BackfillToken111", 20, 20) + .await; + if let Err(error) = second_result { + panic!("second backfill must succeed: {}", error); + } + let token_result = crate::get_token_by_mint(database.as_ref(), "BackfillToken111").await; + let token_option = match token_result { + Ok(token_option) => token_option, + Err(error) => panic!("token fetch must succeed: {}", error), + }; + let token = match token_option { + Some(token) => token, + None => panic!("token must exist"), + }; + let token_id = token.id.unwrap_or_default(); + let pools_result = crate::list_pools(database.as_ref()).await; + let pools = match pools_result { + Ok(pools) => pools, + Err(error) => panic!("pool list must succeed: {}", error), + }; + assert_eq!(pools.len(), 1); + let pool_id = pools[0].id.unwrap_or_default(); + let pool_tokens_result = + crate::list_pool_tokens_by_pool_id(database.as_ref(), pool_id).await; + let pool_tokens = match pool_tokens_result { + Ok(pool_tokens) => pool_tokens, + Err(error) => panic!("pool token list must succeed: {}", error), + }; + let mut found_token = false; + for pool_token in pool_tokens { + if pool_token.token_id == token_id { + found_token = true; + } + } + assert!(found_token); + let pair_result = crate::get_pair_by_pool_id(database.as_ref(), pool_id).await; + let pair_option = match pair_result { + Ok(pair_option) => pair_option, + Err(error) => panic!("pair fetch must succeed: {}", error), + }; + let pair = match pair_option { + Some(pair) => pair, + None => panic!("pair must exist"), + }; + let pair_id = pair.id.unwrap_or_default(); + let trade_events_result = + crate::list_trade_events_by_pair_id(database.as_ref(), pair_id).await; + let trade_events = match trade_events_result { + Ok(trade_events) => trade_events, + Err(error) => panic!("trade event list must succeed: {}", error), + }; + assert_eq!(trade_events.len(), 1); + let pair_metrics_result = crate::list_pair_metrics(database.as_ref()).await; + let pair_metrics = match pair_metrics_result { + Ok(pair_metrics) => pair_metrics, + Err(error) => panic!("pair metric list must succeed: {}", error), + }; + assert_eq!(pair_metrics.len(), 1); + server.shutdown().await; + } +}