From b6ab7bab9ca6419aa8eb0b69d2d84f5b89e3846d Mon Sep 17 00:00:00 2001 From: SinuS Von SifriduS Date: Tue, 28 Apr 2026 11:29:39 +0200 Subject: [PATCH] 0.7.3 --- CHANGELOG.md | 1 + Cargo.toml | 2 +- ROADMAP.md | 28 +- kb_lib/src/dex_detect.rs | 723 ++++++++++++++++++++++++++++++++++++ kb_lib/src/lib.rs | 3 + kb_lib/src/tx_resolution.rs | 16 +- 6 files changed, 757 insertions(+), 16 deletions(-) create mode 100644 kb_lib/src/dex_detect.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 867412a..cbfba29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,3 +33,4 @@ 0.7.0 - Ajout du socle de résolution transactionnelle orientée DEX : relais WS vers file de résolution, récupération getTransaction via HttpEndpointPool et persistance des résolutions dans les observations/signaux 0.7.1 - Ajout du modèle transactionnel enrichi : tables slots/transactions/instructions, requêtes d’accès et projection structurée des transactions résolues 0.7.2 - Ajout du premier décodeur DEX spécifique Raydium AmmV4 / initialize2, persistance des événements DEX décodés et branchement automatique du décodage après résolution/projection transactionnelle +0.7.3 - Ajout de la détection métier depuis les événements DEX décodés, avec alimentation de kb_pools, kb_pairs, kb_pool_tokens et kb_pool_listings, et signaux de première apparition diff --git a/Cargo.toml b/Cargo.toml index 4f48cb6..d3b58f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.7.2" +version = "0.7.3" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index e4bed5c..942898e 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -477,15 +477,15 @@ Réalisé : - branchement automatique du décodage DEX depuis le pipeline de résolution transactionnelle, - préparation de la future détection métier pool / pair / listing. -### 6.035. Version `0.7.3` — Détection des nouveaux pools et paires via logs + transaction -Objectif : détecter rapidement les nouvelles paires/pools à partir des flux RPC et des transactions enrichies. +### 6.034. Version `0.7.3` — Détection des nouveaux pools et paires via logs + transaction +Réalisé : -À faire : - -- relier `logsSubscribe` + `signature` + `getTransaction`, -- détecter les créations de pools via motifs et instructions spécifiques par DEX, -- extraire token A, token B, LP mint, vaults et comptes utiles quand cela est possible, -- alimenter `kb_pools`, `kb_pairs`, `kb_pool_tokens` et `kb_pool_listings` avec des données plus fiables que la seule détection de comptes. +- transformation des événements DEX décodés en objets métier pool / pair / listing, +- alimentation de `kb_pools`, `kb_pairs`, `kb_pool_tokens` et `kb_pool_listings`, +- première détection métier pour Raydium AmmV4 / initialize2, +- branchement automatique de la détection métier après résolution, projection et décodage DEX, +- émission de signaux dédiés pour `new_pool`, `new_pair` et `first_listing_seen`, +- garantie d’idempotence sur une même transaction déjà traitée. ### 6.036. Version `0.7.4` — Modèle métier DEX enrichi Objectif : faire converger la détection technique et le modèle métier vers une vision proche de l’activité réelle du marché. @@ -655,11 +655,13 @@ Le projet doit maintenir au minimum : - les bindings TS générés via `cargo test export_bindings` lorsque les types partagés évoluent. ## 12. Priorité immédiate + La priorité immédiate est désormais la suivante : -1. démarrer la version `0.7.3` avec la détection métier à partir des événements DEX décodés, -2. transformer les événements DEX normalisés en objets métier de type pool, pair et listing, -3. identifier la première apparition locale d’un pool ou d’une paire, -4. relier les détections métier aux tokens et paires déjà connus ou à créer, +1. démarrer la version `0.7.4` avec l’enrichissement du modèle métier DEX, +2. rattacher plus finement une paire à son pool de création et à sa signature fondatrice, +3. préparer les événements métier enrichis de liquidité et de swaps à partir des objets déjà détectés, +4. consolider la vision `token <-> pool <-> pair <-> protocole`, 5. conserver le découplage entre résolution transactionnelle, projection, décodage DEX et détection métier, -6. préparer ensuite la version `0.7.4` pour l’enrichissement des événements de liquidité, de swaps et de suivi d’activité. +6. préparer ensuite la version `0.7.5` pour les wallets, holdings et participants observés. + diff --git a/kb_lib/src/dex_detect.rs b/kb_lib/src/dex_detect.rs new file mode 100644 index 0000000..232d5bd --- /dev/null +++ b/kb_lib/src/dex_detect.rs @@ -0,0 +1,723 @@ +// file: kb_lib/src/dex_detect.rs + +//! Business-level detection built from decoded DEX events. + +/// Result of one business-level DEX pool detection. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbDexPoolDetectionResult { + /// Parent decoded event id. + pub decoded_event_id: i64, + /// DEX id. + pub dex_id: i64, + /// Pool id. + pub pool_id: i64, + /// Pair id. + pub pair_id: i64, + /// Optional pool listing id. + pub pool_listing_id: std::option::Option, + /// Whether the pool was newly created by this detection. + pub created_pool: bool, + /// Whether the pair was newly created by this detection. + pub created_pair: bool, + /// Whether the listing was newly created by this detection. + pub created_listing: bool, +} + +/// Business-level DEX detection service. +#[derive(Debug, Clone)] +pub struct KbDexDetectService { + database: std::sync::Arc, + persistence: crate::KbDetectionPersistenceService, +} + +impl KbDexDetectService { + /// Creates a new DEX detection service. + pub fn new(database: std::sync::Arc) -> Self { + let persistence = crate::KbDetectionPersistenceService::new(database.clone()); + Self { + database, + persistence, + } + } + + /// Detects business-level DEX objects from one transaction signature. + pub async fn detect_transaction_by_signature( + &self, + signature: &str, + ) -> Result, crate::KbError> { + let transaction_result = + crate::get_chain_transaction_by_signature(self.database.as_ref(), signature).await; + let transaction_option = match transaction_result { + Ok(transaction_option) => transaction_option, + Err(error) => return Err(error), + }; + let transaction = match transaction_option { + Some(transaction) => transaction, + None => { + return Err(crate::KbError::InvalidState(format!( + "cannot detect dex objects from unknown transaction '{}'", + signature + ))); + } + }; + let transaction_id_option = transaction.id; + let transaction_id = match transaction_id_option { + Some(transaction_id) => transaction_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "transaction '{}' has no internal id", + signature + ))); + } + }; + let decoded_events_result = crate::list_dex_decoded_events_by_transaction_id( + self.database.as_ref(), + transaction_id, + ) + .await; + let decoded_events = match decoded_events_result { + Ok(decoded_events) => decoded_events, + Err(error) => return Err(error), + }; + let mut detection_results = std::vec::Vec::new(); + for decoded_event in &decoded_events { + if decoded_event.protocol_name == "raydium_amm_v4" + && decoded_event.event_kind == "raydium_amm_v4.initialize2_pool" + { + let detect_result = self + .detect_raydium_initialize2_pool(&transaction, decoded_event) + .await; + let detect_result = match detect_result { + Ok(detect_result) => detect_result, + Err(error) => return Err(error), + }; + detection_results.push(detect_result); + } + } + Ok(detection_results) + } + + async fn detect_raydium_initialize2_pool( + &self, + transaction: &crate::KbChainTransactionDto, + decoded_event: &crate::KbDexDecodedEventDto, + ) -> Result { + let decoded_event_id_option = decoded_event.id; + let decoded_event_id = match decoded_event_id_option { + Some(decoded_event_id) => decoded_event_id, + None => { + return Err(crate::KbError::InvalidState( + "decoded dex event has no internal id".to_string(), + )); + } + }; + let dex_id_result = self.ensure_raydium_dex().await; + let dex_id = match dex_id_result { + Ok(dex_id) => dex_id, + Err(error) => return Err(error), + }; + let pool_address_option = decoded_event.pool_account.clone(); + let pool_address = match pool_address_option { + Some(pool_address) => pool_address, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no pool_account", + decoded_event_id + ))); + } + }; + let token_a_mint_option = decoded_event.token_a_mint.clone(); + let token_a_mint = match token_a_mint_option { + Some(token_a_mint) => token_a_mint, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no token_a_mint", + decoded_event_id + ))); + } + }; + let token_b_mint_option = decoded_event.token_b_mint.clone(); + let token_b_mint = match token_b_mint_option { + Some(token_b_mint) => token_b_mint, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no token_b_mint", + decoded_event_id + ))); + } + }; + let lp_mint = decoded_event.lp_mint.clone(); + + let base_is_token_a = + kb_choose_base_quote_order(token_a_mint.as_str(), token_b_mint.as_str()); + let base_mint = if base_is_token_a { + token_a_mint.clone() + } else { + token_b_mint.clone() + }; + let quote_mint = if base_is_token_a { + token_b_mint.clone() + } else { + token_a_mint.clone() + }; + let base_token_id_result = self.ensure_token(base_mint.as_str()).await; + let base_token_id = match base_token_id_result { + Ok(base_token_id) => base_token_id, + Err(error) => return Err(error), + }; + let quote_token_id_result = self.ensure_token(quote_mint.as_str()).await; + let quote_token_id = match quote_token_id_result { + Ok(quote_token_id) => quote_token_id, + Err(error) => return Err(error), + }; + let lp_token_id = match lp_mint.clone() { + Some(lp_mint) => { + let lp_token_id_result = self.ensure_token(lp_mint.as_str()).await; + match lp_token_id_result { + Ok(lp_token_id) => Some(lp_token_id), + Err(error) => return Err(error), + } + } + None => None, + }; + let existing_pool_result = + crate::get_pool_by_address(self.database.as_ref(), pool_address.as_str()).await; + let existing_pool_option = match existing_pool_result { + Ok(existing_pool_option) => existing_pool_option, + Err(error) => return Err(error), + }; + let created_pool = existing_pool_option.is_none(); + let pool_id = match existing_pool_option { + Some(pool) => { + let pool_id_option = pool.id; + match pool_id_option { + Some(pool_id) => pool_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "pool '{}' has no internal id", + pool.address + ))); + } + } + } + None => { + let pool_dto = crate::KbPoolDto::new( + dex_id, + pool_address.clone(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Active, + ); + let upsert_result = crate::upsert_pool(self.database.as_ref(), &pool_dto).await; + match upsert_result { + Ok(pool_id) => pool_id, + Err(error) => return Err(error), + } + } + }; + let existing_pair_result = + crate::get_pair_by_pool_id(self.database.as_ref(), pool_id).await; + let existing_pair_option = match existing_pair_result { + Ok(existing_pair_option) => existing_pair_option, + Err(error) => return Err(error), + }; + let created_pair = existing_pair_option.is_none(); + let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str()); + let pair_id = match existing_pair_option { + Some(pair) => { + let pair_id_option = pair.id; + match pair_id_option { + Some(pair_id) => pair_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "pair for pool '{}' has no internal id", + pool_id + ))); + } + } + } + None => { + let pair_dto = crate::KbPairDto::new( + dex_id, + pool_id, + base_token_id, + quote_token_id, + pair_symbol, + ); + let upsert_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await; + match upsert_result { + Ok(pair_id) => pair_id, + Err(error) => return Err(error), + } + } + }; + let upsert_base_pool_token_result = crate::upsert_pool_token( + self.database.as_ref(), + &crate::KbPoolTokenDto::new( + pool_id, + base_token_id, + crate::KbPoolTokenRole::Base, + None, + Some(0), + ), + ) + .await; + if let Err(error) = upsert_base_pool_token_result { + return Err(error); + } + let upsert_quote_pool_token_result = crate::upsert_pool_token( + self.database.as_ref(), + &crate::KbPoolTokenDto::new( + pool_id, + quote_token_id, + crate::KbPoolTokenRole::Quote, + None, + Some(1), + ), + ) + .await; + if let Err(error) = upsert_quote_pool_token_result { + return Err(error); + } + if let Some(lp_token_id) = lp_token_id { + let upsert_lp_pool_token_result = crate::upsert_pool_token( + self.database.as_ref(), + &crate::KbPoolTokenDto::new( + pool_id, + lp_token_id, + crate::KbPoolTokenRole::LpMint, + None, + None, + ), + ) + .await; + if let Err(error) = upsert_lp_pool_token_result { + return Err(error); + } + } + let existing_listing_result = + crate::get_pool_listing_by_pool_id(self.database.as_ref(), pool_id).await; + let existing_listing_option = match existing_listing_result { + Ok(existing_listing_option) => existing_listing_option, + Err(error) => return Err(error), + }; + let created_listing = existing_listing_option.is_none(); + let pool_listing_id = match existing_listing_option { + Some(pool_listing) => pool_listing.id, + None => { + let listing_id_result = self + .upsert_pool_listing_from_decoded_event(dex_id, pool_id, pair_id, transaction) + .await; + match listing_id_result { + Ok(listing_id) => Some(listing_id), + Err(error) => return Err(error), + } + } + }; + let payload_value_result = kb_parse_payload_json(decoded_event.payload_json.as_str()); + let payload_value = match payload_value_result { + Ok(payload_value) => payload_value, + Err(error) => return Err(error), + }; + if created_pool { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.new_pool", + crate::KbAnalysisSignalSeverity::Low, + payload_value.clone(), + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + if created_pair { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.new_pair", + crate::KbAnalysisSignalSeverity::Low, + payload_value.clone(), + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + if created_listing { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.first_listing_seen", + crate::KbAnalysisSignalSeverity::Low, + payload_value, + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + Ok(crate::KbDexPoolDetectionResult { + decoded_event_id, + dex_id, + pool_id, + pair_id, + pool_listing_id, + created_pool, + created_pair, + created_listing, + }) + } + + async fn ensure_raydium_dex(&self) -> Result { + let dex_result = crate::get_dex_by_code(self.database.as_ref(), "raydium").await; + let dex_option = match dex_result { + Ok(dex_option) => dex_option, + Err(error) => return Err(error), + }; + match dex_option { + Some(dex) => match dex.id { + Some(dex_id) => Ok(dex_id), + None => Err(crate::KbError::InvalidState( + "raydium dex has no internal id".to_string(), + )), + }, + None => { + let dex_dto = crate::KbDexDto::new( + "raydium".to_string(), + "Raydium".to_string(), + Some(crate::KB_RAYDIUM_AMM_V4_PROGRAM_ID.to_string()), + None, + true, + ); + crate::upsert_dex(self.database.as_ref(), &dex_dto).await + } + } + } + + async fn ensure_token(&self, mint: &str) -> Result { + let token_result = crate::get_token_by_mint(self.database.as_ref(), mint).await; + let token_option = match token_result { + Ok(token_option) => token_option, + Err(error) => return Err(error), + }; + match token_option { + Some(token) => match token.id { + Some(token_id) => Ok(token_id), + None => Err(crate::KbError::InvalidState(format!( + "token '{}' has no internal id", + mint + ))), + }, + None => { + let token_dto = crate::KbTokenDto::new( + mint.to_string(), + None, + None, + None, + crate::SPL_TOKEN_PROGRAM_ID.to_string(), + kb_is_quote_mint(mint), + ); + crate::upsert_token(self.database.as_ref(), &token_dto).await + } + } + } + + async fn upsert_pool_listing_from_decoded_event( + &self, + dex_id: i64, + pool_id: i64, + pair_id: i64, + transaction: &crate::KbChainTransactionDto, + ) -> Result { + let listing_dto = crate::KbPoolListingDto::new( + dex_id, + pool_id, + Some(pair_id), + crate::KbObservationSourceKind::Dex, + transaction.source_endpoint_name.clone(), + None, + None, + None, + ); + crate::upsert_pool_listing(self.database.as_ref(), &listing_dto).await + } + + async fn record_detection_signal( + &self, + transaction: &crate::KbChainTransactionDto, + signal_kind: &str, + severity: crate::KbAnalysisSignalSeverity, + payload: serde_json::Value, + ) -> Result { + let observation_result = self + .persistence + .record_observation(&crate::KbDetectionObservationInput::new( + "dex.business_detection".to_string(), + crate::KbObservationSourceKind::HttpRpc, + transaction.source_endpoint_name.clone(), + transaction.signature.clone(), + transaction.slot, + payload.clone(), + )) + .await; + let observation_id = match observation_result { + Ok(observation_id) => observation_id, + Err(error) => return Err(error), + }; + self.persistence + .record_signal(&crate::KbDetectionSignalInput::new( + signal_kind.to_string(), + severity, + transaction.signature.clone(), + Some(observation_id), + None, + payload, + )) + .await + } +} + +fn kb_is_quote_mint(mint: &str) -> bool { + mint == "So11111111111111111111111111111111111111112" +} + +fn kb_choose_base_quote_order(token_a_mint: &str, token_b_mint: &str) -> bool { + let token_a_is_quote = kb_is_quote_mint(token_a_mint); + let token_b_is_quote = kb_is_quote_mint(token_b_mint); + if token_a_is_quote && !token_b_is_quote { + return false; + } + if token_b_is_quote && !token_a_is_quote { + return true; + } + true +} + +fn kb_build_pair_symbol( + base_mint: &str, + quote_mint: &str, +) -> std::option::Option { + let base_symbol = kb_symbol_hint_from_mint(base_mint); + let quote_symbol = kb_symbol_hint_from_mint(quote_mint); + + match (base_symbol, quote_symbol) { + (Some(base_symbol), Some(quote_symbol)) => Some(format!("{base_symbol}/{quote_symbol}")), + _ => None, + } +} + +fn kb_symbol_hint_from_mint(mint: &str) -> std::option::Option { + if mint == "So11111111111111111111111111111111111111112" { + return Some("WSOL".to_string()); + } + None +} + +fn kb_parse_payload_json(payload_json: &str) -> Result { + let parse_result = serde_json::from_str::(payload_json); + match parse_result { + Ok(value) => Ok(value), + Err(error) => Err(crate::KbError::Json(format!( + "cannot parse dex decoded event payload_json '{}': {}", + payload_json, error + ))), + } +} + +#[cfg(test)] +mod tests { + async fn make_database() -> std::sync::Arc { + let tempdir_result = tempfile::tempdir(); + let tempdir = match tempdir_result { + Ok(tempdir) => tempdir, + Err(error) => panic!("tempdir must succeed: {}", error), + }; + let database_path = tempdir.path().join("dex_detect.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database_result = crate::KbDatabase::connect_and_initialize(&config).await; + let database = match database_result { + Ok(database) => database, + Err(error) => panic!("database init must succeed: {}", error), + }; + std::sync::Arc::new(database) + } + + async fn seed_decoded_raydium_event( + database: std::sync::Arc, + signature: &str, + ) { + let transaction_model = crate::KbTransactionModelService::new(database.clone()); + let dex_decode = crate::KbDexDecodeService::new(database); + let resolved_transaction = serde_json::json!({ + "slot": 910001, + "blockTime": 1779100001, + "version": 0, + "transaction": { + "message": { + "instructions": [ + { + "programId": crate::KB_RAYDIUM_AMM_V4_PROGRAM_ID, + "program": "raydium-amm-v4", + "stackHeight": 1, + "accounts": [ + "Account0", + "Account1", + "Account2", + "Account3", + "PoolDetect111", + "Account5", + "Account6", + "LpDetect111", + "TokenDetectA111", + "So11111111111111111111111111111111111111112", + "Account10", + "Account11", + "Account12", + "Account13", + "Account14", + "Account15", + "MarketDetect111" + ], + "data": "opaque" + } + ] + } + }, + "meta": { + "err": null, + "logMessages": [ + "Program log: initialize2" + ] + } + }); + let project_result = transaction_model + .persist_resolved_transaction( + signature, + Some("helius_primary_http".to_string()), + &resolved_transaction, + ) + .await; + if let Err(error) = project_result { + panic!("projection must succeed: {}", error); + } + let decode_result = dex_decode.decode_transaction_by_signature(signature).await; + if let Err(error) = decode_result { + panic!("dex decode must succeed: {}", error); + } + } + + #[tokio::test] + async fn detect_transaction_by_signature_creates_pool_pair_and_listing() { + let database = make_database().await; + seed_decoded_raydium_event(database.clone(), "sig-dex-detect-1").await; + let detect_service = crate::KbDexDetectService::new(database.clone()); + let detect_result = detect_service + .detect_transaction_by_signature("sig-dex-detect-1") + .await; + let results = match detect_result { + Ok(results) => results, + Err(error) => panic!("dex detect must succeed: {}", error), + }; + assert_eq!(results.len(), 1); + assert!(results[0].created_pool); + assert!(results[0].created_pair); + assert!(results[0].created_listing); + let pool_result = crate::get_pool_by_address(database.as_ref(), "PoolDetect111").await; + let pool_option = match pool_result { + Ok(pool_option) => pool_option, + Err(error) => panic!("pool fetch must succeed: {}", error), + }; + let pool = match pool_option { + Some(pool) => pool, + None => panic!("pool must exist"), + }; + assert_eq!(pool.id, Some(results[0].pool_id)); + let pair_result = crate::get_pair_by_pool_id(database.as_ref(), results[0].pool_id).await; + let pair_option = match pair_result { + Ok(pair_option) => pair_option, + Err(error) => panic!("pair fetch must succeed: {}", error), + }; + let pair = match pair_option { + Some(pair) => pair, + None => panic!("pair must exist"), + }; + assert_eq!(pair.id, Some(results[0].pair_id)); + let listing_result = + crate::get_pool_listing_by_pool_id(database.as_ref(), results[0].pool_id).await; + let listing_option = match listing_result { + Ok(listing_option) => listing_option, + Err(error) => panic!("listing fetch must succeed: {}", error), + }; + let listing = match listing_option { + Some(listing) => listing, + None => panic!("listing must exist"), + }; + assert_eq!(listing.id, results[0].pool_listing_id); + let pool_tokens_result = + crate::list_pool_tokens_by_pool_id(database.as_ref(), results[0].pool_id).await; + let pool_tokens = match pool_tokens_result { + Ok(pool_tokens) => pool_tokens, + Err(error) => panic!("pool tokens list must succeed: {}", error), + }; + assert_eq!(pool_tokens.len(), 3); + } + + #[tokio::test] + async fn detect_transaction_by_signature_is_idempotent() { + let database = make_database().await; + seed_decoded_raydium_event(database.clone(), "sig-dex-detect-2").await; + let detect_service = crate::KbDexDetectService::new(database.clone()); + let first_result = detect_service + .detect_transaction_by_signature("sig-dex-detect-2") + .await; + let first_results = match first_result { + Ok(first_results) => first_results, + Err(error) => panic!("first dex detect must succeed: {}", error), + }; + assert_eq!(first_results.len(), 1); + assert!(first_results[0].created_pool); + assert!(first_results[0].created_pair); + assert!(first_results[0].created_listing); + let second_result = detect_service + .detect_transaction_by_signature("sig-dex-detect-2") + .await; + let second_results = match second_result { + Ok(second_results) => second_results, + Err(error) => panic!("second dex detect must succeed: {}", error), + }; + assert_eq!(second_results.len(), 1); + assert!(!second_results[0].created_pool); + assert!(!second_results[0].created_pair); + assert!(!second_results[0].created_listing); + let pools_result = crate::list_pools(database.as_ref()).await; + let pools = match pools_result { + Ok(pools) => pools, + Err(error) => panic!("pools list must succeed: {}", error), + }; + assert_eq!(pools.len(), 1); + let pairs_result = crate::list_pairs(database.as_ref()).await; + let pairs = match pairs_result { + Ok(pairs) => pairs, + Err(error) => panic!("pairs list must succeed: {}", error), + }; + assert_eq!(pairs.len(), 1); + let listings_result = crate::list_pool_listings(database.as_ref()).await; + let listings = match listings_result { + Ok(listings) => listings, + Err(error) => panic!("listings list must succeed: {}", error), + }; + assert_eq!(listings.len(), 1); + } +} diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index eb243cf..e8a1425 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -25,6 +25,7 @@ mod tx_resolution; mod tx_model; mod dex; mod dex_decode; +mod dex_detect; pub use constants::*; pub use error::KbError; @@ -204,3 +205,5 @@ pub use dex::KbRaydiumAmmV4Decoder; pub use dex::KbRaydiumAmmV4Initialize2PoolDecoded; pub use dex::KB_RAYDIUM_AMM_V4_PROGRAM_ID; pub use dex_decode::KbDexDecodeService; +pub use dex_detect::KbDexDetectService; +pub use dex_detect::KbDexPoolDetectionResult; diff --git a/kb_lib/src/tx_resolution.rs b/kb_lib/src/tx_resolution.rs index 6cc88f9..3cd7c8c 100644 --- a/kb_lib/src/tx_resolution.rs +++ b/kb_lib/src/tx_resolution.rs @@ -100,6 +100,7 @@ pub struct KbTransactionResolutionService { persistence: crate::KbDetectionPersistenceService, transaction_model: crate::KbTransactionModelService, dex_decode_service: crate::KbDexDecodeService, + dex_detect_service: crate::KbDexDetectService, http_role: std::string::String, resolved_signatures: std::sync::Arc>>, @@ -114,13 +115,14 @@ impl KbTransactionResolutionService { ) -> Self { let persistence = crate::KbDetectionPersistenceService::new(database.clone()); let transaction_model = crate::KbTransactionModelService::new(database.clone()); - let dex_decode_service = crate::KbDexDecodeService::new(database); - + let dex_decode_service = crate::KbDexDecodeService::new(database.clone()); + let dex_detect_service = crate::KbDexDetectService::new(database); Self { http_pool, persistence, transaction_model, dex_decode_service, + dex_detect_service, http_role, resolved_signatures: std::sync::Arc::new(tokio::sync::Mutex::new( std::collections::HashSet::new(), @@ -303,6 +305,15 @@ impl KbTransactionResolutionService { Err(error) => return Err(error), }; let decoded_event_count = decoded_events.len(); + let detection_results_result = self + .dex_detect_service + .detect_transaction_by_signature(request.signature.as_str()) + .await; + let detection_results = match detection_results_result { + Ok(detection_results) => detection_results, + Err(error) => return Err(error), + }; + let detected_object_count = detection_results.len(); let payload = serde_json::json!({ "status": "resolved", "signature": request.signature.clone(), @@ -311,6 +322,7 @@ impl KbTransactionResolutionService { "slotHint": request.slot_hint, "projectedTransactionId": projected_transaction_id, "decodedEventCount": decoded_event_count, + "detectedObjectCount": detected_object_count, "triggerPayload": request.trigger_payload.clone(), "transaction": transaction_value });