diff --git a/CHANGELOG.md b/CHANGELOG.md index 430e5be..0b9ae07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,3 +35,4 @@ 0.7.2 - Ajout du premier décodeur DEX spécifique Raydium AmmV4 / initialize2, persistance des événements DEX décodés et branchement automatique du décodage après résolution/projection transactionnelle 0.7.3 - Ajout de la détection métier depuis les événements DEX décodés, avec alimentation de kb_pools, kb_pairs, kb_pool_tokens et kb_pool_listings, et signaux de première apparition 0.7.4 - Ajout du premier lot multi-DEX v1 avec décodeurs Pump.fun (create_v2) et PumpSwap (buy/sell), plus détection métier Pump.fun vers token/pool/pair/listing +0.7.5 - Enrichissement de PumpSwap avec extraction des mints et du pool_v2, persistance des événements décodés enrichis et détection métier automatique pool/pair/listing diff --git a/Cargo.toml b/Cargo.toml index e0533d2..a603a5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.7.4" +version = "0.7.5" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index e207b15..d620390 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -498,16 +498,14 @@ Réalisé : - préparation de l’extension vers `Meteora`, `Meteora DBC` et `LaunchLab`. ### 6.037. Version `0.7.5` — Connecteurs DEX v1, vague 2 -Objectif : couvrir les DEX AMM et agrégateurs secondaires nécessaires à une détection large des nouveaux pools. +Réalisé : -À faire : - -- ajouter les décodeurs et la détection métier pour `Orca`, -- compléter la couverture `Raydium` au-delà de `AmmV4 / initialize2` si nécessaire, -- ajouter les décodeurs et la détection métier pour `Bags`, -- ajouter les décodeurs et la détection métier pour `FluxBeam`, -- ajouter les décodeurs et la détection métier pour `DexLab`, -- harmoniser la sortie métier entre tous les connecteurs pris en charge. +- enrichissement du décodeur `PumpSwap` avec extraction des mints et du `pool_v2`, +- persistance des événements `PumpSwap` enrichis dans `kb_dex_decoded_events`, +- ajout de la détection métier `PumpSwap` vers `pool / pair / listing`, +- émission des signaux dédiés `new_pool`, `new_pair` et `first_listing_seen`, +- garantie d’idempotence sur une même transaction déjà traitée, +- préparation du lot suivant pour `Meteora`, `Meteora DBC` et `LaunchLab`. ### 6.038. Version `0.7.6` — Connecteurs DEX v1, vague 3 Objectif : compléter la couverture des protocoles restants et stabiliser l’intégration globale. diff --git a/kb_lib/src/dex/pump_swap.rs b/kb_lib/src/dex/pump_swap.rs index 6a556ba..83dfb9e 100644 --- a/kb_lib/src/dex/pump_swap.rs +++ b/kb_lib/src/dex/pump_swap.rs @@ -20,6 +20,12 @@ pub struct KbPumpSwapTradeDecoded { pub trade_side: crate::KbSwapTradeSide, /// Optional heuristic pool account. pub pool_account: std::option::Option, + /// Optional token A mint. + pub token_a_mint: std::option::Option, + /// Optional token B mint. + pub token_b_mint: std::option::Option, + /// Optional pool_v2 account or upgraded pool account. + pub pool_v2: std::option::Option, /// Decoded payload. pub payload_json: serde_json::Value, } @@ -94,7 +100,52 @@ impl KbPumpSwapDecoder { Ok(accounts) => accounts, Err(error) => return Err(error), }; - let pool_account = kb_extract_account(&accounts, 0); + let parsed_json_result = + kb_parse_optional_parsed_json(instruction.parsed_json.as_ref()); + let parsed_json = match parsed_json_result { + Ok(parsed_json) => parsed_json, + Err(error) => return Err(error), + }; + let pool_account = kb_extract_string_by_candidate_keys( + parsed_json.as_ref(), + &["pool", "poolAccount", "amm", "ammPool", "poolState"], + ) + .or_else(|| kb_extract_account(&accounts, 0)); + let token_a_mint = kb_extract_string_by_candidate_keys( + parsed_json.as_ref(), + &[ + "tokenAMint", + "baseMint", + "mintA", + "coinMint", + "token0Mint", + "mint0", + ], + ) + .or_else(|| kb_extract_account(&accounts, 1)); + let token_b_mint = kb_extract_string_by_candidate_keys( + parsed_json.as_ref(), + &[ + "tokenBMint", + "quoteMint", + "mintB", + "pcMint", + "token1Mint", + "mint1", + ], + ) + .or_else(|| kb_extract_account(&accounts, 2)); + let pool_v2 = kb_extract_string_by_candidate_keys( + parsed_json.as_ref(), + &[ + "poolV2", + "pool_v2", + "ammV2", + "bondingCurveV2", + "bonding_curve_v2", + ], + ) + .or_else(|| kb_extract_account(&accounts, 3)); let is_buy = kb_log_messages_contain_keyword(&log_messages, "buy"); let is_sell = kb_log_messages_contain_keyword(&log_messages, "sell"); if !is_buy && !is_sell { @@ -106,8 +157,12 @@ impl KbPumpSwapDecoder { "instructionId": instruction_id, "instructionIndex": instruction.instruction_index, "accounts": accounts, + "parsed": parsed_json, "logMessages": log_messages, - "poolAccount": pool_account + "poolAccount": pool_account, + "tokenAMint": token_a_mint, + "tokenBMint": token_b_mint, + "poolV2": pool_v2 }); if is_buy { decoded_events.push(crate::KbPumpSwapDecodedEvent::BuyTrade( @@ -118,6 +173,9 @@ impl KbPumpSwapDecoder { program_id: program_id.clone(), trade_side: crate::KbSwapTradeSide::BuyBase, pool_account: pool_account.clone(), + token_a_mint: token_a_mint.clone(), + token_b_mint: token_b_mint.clone(), + pool_v2: pool_v2.clone(), payload_json: payload_json.clone(), }, )); @@ -131,6 +189,9 @@ impl KbPumpSwapDecoder { program_id: program_id.clone(), trade_side: crate::KbSwapTradeSide::SellBase, pool_account, + token_a_mint, + token_b_mint, + pool_v2, payload_json, }, )); @@ -213,6 +274,69 @@ fn kb_extract_account( Some(accounts[index].clone()) } +fn kb_parse_optional_parsed_json( + parsed_json: std::option::Option<&std::string::String>, +) -> Result, crate::KbError> { + let parsed_json = match parsed_json { + Some(parsed_json) => parsed_json, + None => return Ok(None), + }; + let value_result = serde_json::from_str::(parsed_json.as_str()); + match value_result { + Ok(value) => Ok(Some(value)), + Err(error) => Err(crate::KbError::Json(format!( + "cannot parse instruction parsed_json '{}': {}", + parsed_json, error + ))), + } +} + +fn kb_extract_string_by_candidate_keys( + value: std::option::Option<&serde_json::Value>, + candidate_keys: &[&str], +) -> std::option::Option { + let value = match value { + Some(value) => value, + None => return None, + }; + kb_extract_string_by_candidate_keys_inner(value, candidate_keys) +} + +fn kb_extract_string_by_candidate_keys_inner( + value: &serde_json::Value, + candidate_keys: &[&str], +) -> std::option::Option { + if let Some(object) = value.as_object() { + for candidate_key in candidate_keys { + let direct_option = object.get(*candidate_key); + if let Some(direct) = direct_option { + let direct_text_option = direct.as_str(); + if let Some(direct_text) = direct_text_option { + return Some(direct_text.to_string()); + } + } + } + for nested_value in object.values() { + let nested_result = + kb_extract_string_by_candidate_keys_inner(nested_value, candidate_keys); + if nested_result.is_some() { + return nested_result; + } + } + return None; + } + if let Some(array) = value.as_array() { + for nested_value in array { + let nested_result = + kb_extract_string_by_candidate_keys_inner(nested_value, candidate_keys); + if nested_result.is_some() { + return nested_result; + } + } + } + None +} + #[cfg(test)] mod tests { fn make_transaction_with_buy_log() -> crate::KbChainTransactionDto { @@ -252,10 +376,20 @@ mod tests { Some(crate::KB_PUMP_SWAP_PROGRAM_ID.to_string()), Some("pump-amm".to_string()), Some(1), - serde_json::json!(["PumpPool111", "Other1", "Other2"]).to_string(), - None, + serde_json::json!(["PumpPool111", "TokenA111", "TokenB111", "PoolV2_111"]).to_string(), None, None, + Some( + serde_json::json!({ + "info": { + "pool": "PumpPool111", + "baseMint": "TokenA111", + "quoteMint": "TokenB111", + "poolV2": "PoolV2_111" + } + }) + .to_string(), + ), ); dto.id = Some(18); dto @@ -266,7 +400,6 @@ mod tests { let decoder = crate::KbPumpSwapDecoder::new(); let transaction = make_transaction_with_buy_log(); let instructions = vec![make_instruction()]; - let decoded_result = decoder.decode_transaction(&transaction, &instructions); let decoded = match decoded_result { Ok(decoded) => decoded, @@ -278,6 +411,9 @@ mod tests { assert_eq!(event.transaction_id, 92); assert_eq!(event.instruction_id, 18); assert_eq!(event.pool_account, Some("PumpPool111".to_string())); + assert_eq!(event.token_a_mint, Some("TokenA111".to_string())); + assert_eq!(event.token_b_mint, Some("TokenB111".to_string())); + assert_eq!(event.pool_v2, Some("PoolV2_111".to_string())); assert_eq!(event.trade_side, crate::KbSwapTradeSide::BuyBase); } crate::KbPumpSwapDecodedEvent::SellTrade(_) => { diff --git a/kb_lib/src/dex_decode.rs b/kb_lib/src/dex_decode.rs index 52f1b8b..778f953 100644 --- a/kb_lib/src/dex_decode.rs +++ b/kb_lib/src/dex_decode.rs @@ -392,9 +392,9 @@ impl KbDexDecodeService { event_kind.to_string(), event.pool_account.clone(), None, - None, - None, - None, + event.token_a_mint.clone(), + event.token_b_mint.clone(), + event.pool_v2.clone(), payload_json, ); let upsert_result = crate::upsert_dex_decoded_event(self.database.as_ref(), &dto).await; @@ -595,6 +595,60 @@ mod tests { } } + async fn seed_projected_pump_swap_transaction( + database: std::sync::Arc, + signature: &str, + ) { + let service = crate::KbTransactionModelService::new(database); + let resolved_transaction = serde_json::json!({ + "slot": 999003, + "blockTime": 1779000003, + "version": 0, + "transaction": { + "message": { + "instructions": [ + { + "programId": crate::KB_PUMP_SWAP_PROGRAM_ID, + "program": "pump-amm", + "stackHeight": 1, + "accounts": [ + "PumpSwapPool111", + "PumpSwapTokenA111", + "PumpSwapTokenB111", + "PumpSwapPoolV2_111" + ], + "parsed": { + "info": { + "pool": "PumpSwapPool111", + "baseMint": "PumpSwapTokenA111", + "quoteMint": "PumpSwapTokenB111", + "poolV2": "PumpSwapPoolV2_111" + } + }, + "data": "opaque" + } + ] + } + }, + "meta": { + "err": null, + "logMessages": [ + "Program log: Instruction: Buy" + ] + } + }); + let persist_result = service + .persist_resolved_transaction( + signature, + Some("helius_primary_http".to_string()), + &resolved_transaction, + ) + .await; + if let Err(error) = persist_result { + panic!("projection must succeed: {}", error); + } + } + #[tokio::test] async fn decode_transaction_by_signature_persists_decoded_raydium_event() { let database = make_database().await; @@ -634,4 +688,34 @@ mod tests { ); assert_eq!(decoded[0].token_a_mint, Some("MintPF111".to_string())); } + + #[tokio::test] + async fn decode_transaction_by_signature_persists_decoded_pump_swap_event() { + let database = make_database().await; + seed_projected_pump_swap_transaction(database.clone(), "sig-dex-decode-pumpswap-1").await; + let service = crate::KbDexDecodeService::new(database.clone()); + let decoded_result = service + .decode_transaction_by_signature("sig-dex-decode-pumpswap-1") + .await; + let decoded = match decoded_result { + Ok(decoded) => decoded, + Err(error) => panic!("decode must succeed: {}", error), + }; + assert_eq!(decoded.len(), 1); + assert_eq!(decoded[0].protocol_name, "pump_swap"); + assert_eq!(decoded[0].event_kind, "pump_swap.buy"); + assert_eq!(decoded[0].pool_account, Some("PumpSwapPool111".to_string())); + assert_eq!( + decoded[0].token_a_mint, + Some("PumpSwapTokenA111".to_string()) + ); + assert_eq!( + decoded[0].token_b_mint, + Some("PumpSwapTokenB111".to_string()) + ); + assert_eq!( + decoded[0].market_account, + Some("PumpSwapPoolV2_111".to_string()) + ); + } } diff --git a/kb_lib/src/dex_detect.rs b/kb_lib/src/dex_detect.rs index a0a8bc0..f6d771a 100644 --- a/kb_lib/src/dex_detect.rs +++ b/kb_lib/src/dex_detect.rs @@ -105,6 +105,30 @@ impl KbDexDetectService { }; detection_results.push(detect_result); } + if decoded_event.protocol_name == "pump_swap" + && decoded_event.event_kind == "pump_swap.buy" + { + let detect_result = self + .detect_pump_swap_trade(&transaction, decoded_event) + .await; + let detect_result = match detect_result { + Ok(detect_result) => detect_result, + Err(error) => return Err(error), + }; + detection_results.push(detect_result); + } + if decoded_event.protocol_name == "pump_swap" + && decoded_event.event_kind == "pump_swap.sell" + { + let detect_result = self + .detect_pump_swap_trade(&transaction, decoded_event) + .await; + let detect_result = match detect_result { + Ok(detect_result) => detect_result, + Err(error) => return Err(error), + }; + detection_results.push(detect_result); + } } Ok(detection_results) } @@ -614,6 +638,276 @@ impl KbDexDetectService { }) } + async fn detect_pump_swap_trade( + &self, + transaction: &crate::KbChainTransactionDto, + decoded_event: &crate::KbDexDecodedEventDto, + ) -> Result { + let decoded_event_id_option = decoded_event.id; + let decoded_event_id = match decoded_event_id_option { + Some(decoded_event_id) => decoded_event_id, + None => { + return Err(crate::KbError::InvalidState( + "decoded dex event has no internal id".to_string(), + )); + } + }; + let dex_id_result = self.ensure_pump_swap_dex().await; + let dex_id = match dex_id_result { + Ok(dex_id) => dex_id, + Err(error) => return Err(error), + }; + let pool_address_option = decoded_event.pool_account.clone(); + let pool_address = match pool_address_option { + Some(pool_address) => pool_address, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no pool_account", + decoded_event_id + ))); + } + }; + let token_a_mint_option = decoded_event.token_a_mint.clone(); + let token_a_mint = match token_a_mint_option { + Some(token_a_mint) => token_a_mint, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no token_a_mint", + decoded_event_id + ))); + } + }; + let token_b_mint_option = decoded_event.token_b_mint.clone(); + let token_b_mint = match token_b_mint_option { + Some(token_b_mint) => token_b_mint, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no token_b_mint", + decoded_event_id + ))); + } + }; + let base_is_token_a = + kb_choose_base_quote_order(token_a_mint.as_str(), token_b_mint.as_str()); + let base_mint = if base_is_token_a { + token_a_mint.clone() + } else { + token_b_mint.clone() + }; + let quote_mint = if base_is_token_a { + token_b_mint.clone() + } else { + token_a_mint.clone() + }; + let base_token_id_result = self.ensure_token(base_mint.as_str()).await; + let base_token_id = match base_token_id_result { + Ok(base_token_id) => base_token_id, + Err(error) => return Err(error), + }; + let quote_token_id_result = self.ensure_token(quote_mint.as_str()).await; + let quote_token_id = match quote_token_id_result { + Ok(quote_token_id) => quote_token_id, + Err(error) => return Err(error), + }; + let existing_pool_result = + crate::get_pool_by_address(self.database.as_ref(), pool_address.as_str()).await; + let existing_pool_option = match existing_pool_result { + Ok(existing_pool_option) => existing_pool_option, + Err(error) => return Err(error), + }; + let created_pool = existing_pool_option.is_none(); + let pool_id = match existing_pool_option { + Some(pool) => { + let pool_id_option = pool.id; + match pool_id_option { + Some(pool_id) => pool_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "pool '{}' has no internal id", + pool.address + ))); + } + } + } + None => { + let pool_dto = crate::KbPoolDto::new( + dex_id, + pool_address.clone(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Active, + ); + let upsert_result = crate::upsert_pool(self.database.as_ref(), &pool_dto).await; + match upsert_result { + Ok(pool_id) => pool_id, + Err(error) => return Err(error), + } + } + }; + let existing_pair_result = + crate::get_pair_by_pool_id(self.database.as_ref(), pool_id).await; + let existing_pair_option = match existing_pair_result { + Ok(existing_pair_option) => existing_pair_option, + Err(error) => return Err(error), + }; + let created_pair = existing_pair_option.is_none(); + let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str()); + let pair_id = match existing_pair_option { + Some(pair) => { + let pair_id_option = pair.id; + match pair_id_option { + Some(pair_id) => pair_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "pair for pool '{}' has no internal id", + pool_id + ))); + } + } + } + None => { + let pair_dto = crate::KbPairDto::new( + dex_id, + pool_id, + base_token_id, + quote_token_id, + pair_symbol, + ); + let upsert_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await; + match upsert_result { + Ok(pair_id) => pair_id, + Err(error) => return Err(error), + } + } + }; + let upsert_base_pool_token_result = crate::upsert_pool_token( + self.database.as_ref(), + &crate::KbPoolTokenDto::new( + pool_id, + base_token_id, + crate::KbPoolTokenRole::Base, + None, + Some(0), + ), + ) + .await; + if let Err(error) = upsert_base_pool_token_result { + return Err(error); + } + let upsert_quote_pool_token_result = crate::upsert_pool_token( + self.database.as_ref(), + &crate::KbPoolTokenDto::new( + pool_id, + quote_token_id, + crate::KbPoolTokenRole::Quote, + None, + Some(1), + ), + ) + .await; + if let Err(error) = upsert_quote_pool_token_result { + return Err(error); + } + let existing_listing_result = + crate::get_pool_listing_by_pool_id(self.database.as_ref(), pool_id).await; + let existing_listing_option = match existing_listing_result { + Ok(existing_listing_option) => existing_listing_option, + Err(error) => return Err(error), + }; + let created_listing = existing_listing_option.is_none(); + let pool_listing_id = match existing_listing_option { + Some(pool_listing) => pool_listing.id, + None => { + let listing_id_result = self + .upsert_pool_listing_from_decoded_event(dex_id, pool_id, pair_id, transaction) + .await; + match listing_id_result { + Ok(listing_id) => Some(listing_id), + Err(error) => return Err(error), + } + } + }; + let payload_value_result = kb_parse_payload_json(decoded_event.payload_json.as_str()); + let payload_value = match payload_value_result { + Ok(payload_value) => payload_value, + Err(error) => return Err(error), + }; + if created_pool { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.pump_swap.new_pool", + crate::KbAnalysisSignalSeverity::Low, + payload_value.clone(), + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + if created_pair { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.pump_swap.new_pair", + crate::KbAnalysisSignalSeverity::Low, + payload_value.clone(), + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + if created_listing { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.pump_swap.first_listing_seen", + crate::KbAnalysisSignalSeverity::Low, + payload_value, + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + Ok(crate::KbDexPoolDetectionResult { + decoded_event_id, + dex_id, + pool_id, + pair_id, + pool_listing_id, + created_pool, + created_pair, + created_listing, + }) + } + + async fn ensure_pump_swap_dex(&self) -> Result { + let dex_result = crate::get_dex_by_code(self.database.as_ref(), "pump_swap").await; + let dex_option = match dex_result { + Ok(dex_option) => dex_option, + Err(error) => return Err(error), + }; + match dex_option { + Some(dex) => match dex.id { + Some(dex_id) => Ok(dex_id), + None => Err(crate::KbError::InvalidState( + "pump_swap dex has no internal id".to_string(), + )), + }, + None => { + let dex_dto = crate::KbDexDto::new( + "pump_swap".to_string(), + "PumpSwap".to_string(), + Some(crate::KB_PUMP_SWAP_PROGRAM_ID.to_string()), + None, + true, + ); + crate::upsert_dex(self.database.as_ref(), &dex_dto).await + } + } + } + async fn ensure_pump_fun_dex(&self) -> Result { let dex_result = crate::get_dex_by_code(self.database.as_ref(), "pump_fun").await; let dex_option = match dex_result { @@ -1105,4 +1399,157 @@ mod tests { }; assert_eq!(pool_tokens.len(), 2); } + + async fn seed_decoded_pump_swap_event( + database: std::sync::Arc, + signature: &str, + ) { + let transaction_model = crate::KbTransactionModelService::new(database.clone()); + let dex_decode = crate::KbDexDecodeService::new(database); + let resolved_transaction = serde_json::json!({ + "slot": 910003, + "blockTime": 1779100003, + "version": 0, + "transaction": { + "message": { + "instructions": [ + { + "programId": crate::KB_PUMP_SWAP_PROGRAM_ID, + "program": "pump-amm", + "stackHeight": 1, + "accounts": [ + "PumpSwapDetectPool111", + "PumpSwapDetectTokenA111", + "So11111111111111111111111111111111111111112", + "PumpSwapDetectPoolV2_111" + ], + "parsed": { + "info": { + "pool": "PumpSwapDetectPool111", + "baseMint": "PumpSwapDetectTokenA111", + "quoteMint": "So11111111111111111111111111111111111111112", + "poolV2": "PumpSwapDetectPoolV2_111" + } + }, + "data": "opaque" + } + ] + } + }, + "meta": { + "err": null, + "logMessages": [ + "Program log: Instruction: Buy" + ] + } + }); + let project_result = transaction_model + .persist_resolved_transaction( + signature, + Some("helius_primary_http".to_string()), + &resolved_transaction, + ) + .await; + if let Err(error) = project_result { + panic!("projection must succeed: {}", error); + } + let decode_result = dex_decode.decode_transaction_by_signature(signature).await; + if let Err(error) = decode_result { + panic!("dex decode must succeed: {}", error); + } + } + + #[tokio::test] + async fn detect_transaction_by_signature_creates_pump_swap_pool_pair_and_listing() { + let database = make_database().await; + seed_decoded_pump_swap_event(database.clone(), "sig-dex-detect-pumpswap-1").await; + + let detect_service = crate::KbDexDetectService::new(database.clone()); + + let detect_result = detect_service + .detect_transaction_by_signature("sig-dex-detect-pumpswap-1") + .await; + let results = match detect_result { + Ok(results) => results, + Err(error) => panic!("dex detect must succeed: {}", error), + }; + + assert_eq!(results.len(), 1); + assert!(results[0].created_pool); + assert!(results[0].created_pair); + assert!(results[0].created_listing); + + let pool_result = + crate::get_pool_by_address(database.as_ref(), "PumpSwapDetectPool111").await; + let pool_option = match pool_result { + Ok(pool_option) => pool_option, + Err(error) => panic!("pool fetch must succeed: {}", error), + }; + let pool = match pool_option { + Some(pool) => pool, + None => panic!("pool must exist"), + }; + assert_eq!(pool.id, Some(results[0].pool_id)); + assert_eq!(pool.pool_kind, crate::KbPoolKind::Amm); + + let pair_result = crate::get_pair_by_pool_id(database.as_ref(), results[0].pool_id).await; + let pair_option = match pair_result { + Ok(pair_option) => pair_option, + Err(error) => panic!("pair fetch must succeed: {}", error), + }; + let pair = match pair_option { + Some(pair) => pair, + None => panic!("pair must exist"), + }; + assert_eq!(pair.id, Some(results[0].pair_id)); + + let listing_result = + crate::get_pool_listing_by_pool_id(database.as_ref(), results[0].pool_id).await; + let listing_option = match listing_result { + Ok(listing_option) => listing_option, + Err(error) => panic!("listing fetch must succeed: {}", error), + }; + let listing = match listing_option { + Some(listing) => listing, + None => panic!("listing must exist"), + }; + assert_eq!(listing.id, results[0].pool_listing_id); + + let pool_tokens_result = + crate::list_pool_tokens_by_pool_id(database.as_ref(), results[0].pool_id).await; + let pool_tokens = match pool_tokens_result { + Ok(pool_tokens) => pool_tokens, + Err(error) => panic!("pool tokens list must succeed: {}", error), + }; + assert_eq!(pool_tokens.len(), 2); + } + + #[tokio::test] + async fn detect_transaction_by_signature_is_idempotent_for_pump_swap() { + let database = make_database().await; + seed_decoded_pump_swap_event(database.clone(), "sig-dex-detect-pumpswap-2").await; + let detect_service = crate::KbDexDetectService::new(database.clone()); + let first_result = detect_service + .detect_transaction_by_signature("sig-dex-detect-pumpswap-2") + .await; + let first_results = match first_result { + Ok(first_results) => first_results, + Err(error) => panic!("first dex detect must succeed: {}", error), + }; + assert_eq!(first_results.len(), 1); + assert!(first_results[0].created_pool); + assert!(first_results[0].created_pair); + assert!(first_results[0].created_listing); + let second_result = detect_service + .detect_transaction_by_signature("sig-dex-detect-pumpswap-2") + .await; + let second_results = match second_result { + Ok(second_results) => second_results, + Err(error) => panic!("second dex detect must succeed: {}", error), + }; + assert_eq!(second_results.len(), 1); + assert!(!second_results[0].created_pool); + assert!(!second_results[0].created_pair); + assert!(!second_results[0].created_listing); + } }