// file: kb_lib/src/dex_detect.rs //! Business-level detection built from decoded DEX events. /// Result of one business-level DEX pool detection. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct KbDexPoolDetectionResult { /// Parent decoded event id. pub decoded_event_id: i64, /// DEX id. pub dex_id: i64, /// Pool id. pub pool_id: i64, /// Pair id. pub pair_id: i64, /// Optional pool listing id. pub pool_listing_id: std::option::Option, /// Whether the pool was newly created by this detection. pub created_pool: bool, /// Whether the pair was newly created by this detection. pub created_pair: bool, /// Whether the listing was newly created by this detection. pub created_listing: bool, } /// Business-level DEX detection service. #[derive(Debug, Clone)] pub struct KbDexDetectService { database: std::sync::Arc, persistence: crate::KbDetectionPersistenceService, } impl KbDexDetectService { /// Creates a new DEX detection service. pub fn new(database: std::sync::Arc) -> Self { let persistence = crate::KbDetectionPersistenceService::new(database.clone()); Self { database, persistence, } } /// Detects business-level DEX objects from one transaction signature. pub async fn detect_transaction_by_signature( &self, signature: &str, ) -> Result, crate::KbError> { let transaction_result = crate::get_chain_transaction_by_signature(self.database.as_ref(), signature).await; let transaction_option = match transaction_result { Ok(transaction_option) => transaction_option, Err(error) => return Err(error), }; let transaction = match transaction_option { Some(transaction) => transaction, None => { return Err(crate::KbError::InvalidState(format!( "cannot detect dex objects from unknown transaction '{}'", signature ))); } }; let transaction_id_option = transaction.id; let transaction_id = match transaction_id_option { Some(transaction_id) => transaction_id, None => { return Err(crate::KbError::InvalidState(format!( "transaction '{}' has no internal id", signature ))); } }; let decoded_events_result = crate::list_dex_decoded_events_by_transaction_id( self.database.as_ref(), transaction_id, ) .await; let decoded_events = match decoded_events_result { Ok(decoded_events) => decoded_events, Err(error) => return Err(error), }; let mut detection_results = std::vec::Vec::new(); for decoded_event in &decoded_events { if decoded_event.protocol_name == "raydium_amm_v4" && decoded_event.event_kind == "raydium_amm_v4.initialize2_pool" { let detect_result = self .detect_raydium_initialize2_pool(&transaction, decoded_event) .await; let detect_result = match detect_result { Ok(detect_result) => detect_result, Err(error) => return Err(error), }; detection_results.push(detect_result); } } Ok(detection_results) } async fn detect_raydium_initialize2_pool( &self, transaction: &crate::KbChainTransactionDto, decoded_event: &crate::KbDexDecodedEventDto, ) -> Result { let decoded_event_id_option = decoded_event.id; let decoded_event_id = match decoded_event_id_option { Some(decoded_event_id) => decoded_event_id, None => { return Err(crate::KbError::InvalidState( "decoded dex event has no internal id".to_string(), )); } }; let dex_id_result = self.ensure_raydium_dex().await; let dex_id = match dex_id_result { Ok(dex_id) => dex_id, Err(error) => return Err(error), }; let pool_address_option = decoded_event.pool_account.clone(); let pool_address = match pool_address_option { Some(pool_address) => pool_address, None => { return Err(crate::KbError::InvalidState(format!( "decoded event '{}' has no pool_account", decoded_event_id ))); } }; let token_a_mint_option = decoded_event.token_a_mint.clone(); let token_a_mint = match token_a_mint_option { Some(token_a_mint) => token_a_mint, None => { return Err(crate::KbError::InvalidState(format!( "decoded event '{}' has no token_a_mint", decoded_event_id ))); } }; let token_b_mint_option = decoded_event.token_b_mint.clone(); let token_b_mint = match token_b_mint_option { Some(token_b_mint) => token_b_mint, None => { return Err(crate::KbError::InvalidState(format!( "decoded event '{}' has no token_b_mint", decoded_event_id ))); } }; let lp_mint = decoded_event.lp_mint.clone(); let base_is_token_a = kb_choose_base_quote_order(token_a_mint.as_str(), token_b_mint.as_str()); let base_mint = if base_is_token_a { token_a_mint.clone() } else { token_b_mint.clone() }; let quote_mint = if base_is_token_a { token_b_mint.clone() } else { token_a_mint.clone() }; let base_token_id_result = self.ensure_token(base_mint.as_str()).await; let base_token_id = match base_token_id_result { Ok(base_token_id) => base_token_id, Err(error) => return Err(error), }; let quote_token_id_result = self.ensure_token(quote_mint.as_str()).await; let quote_token_id = match quote_token_id_result { Ok(quote_token_id) => quote_token_id, Err(error) => return Err(error), }; let lp_token_id = match lp_mint.clone() { Some(lp_mint) => { let lp_token_id_result = self.ensure_token(lp_mint.as_str()).await; match lp_token_id_result { Ok(lp_token_id) => Some(lp_token_id), Err(error) => return Err(error), } } None => None, }; let existing_pool_result = crate::get_pool_by_address(self.database.as_ref(), pool_address.as_str()).await; let existing_pool_option = match existing_pool_result { Ok(existing_pool_option) => existing_pool_option, Err(error) => return Err(error), }; let created_pool = existing_pool_option.is_none(); let pool_id = match existing_pool_option { Some(pool) => { let pool_id_option = pool.id; match pool_id_option { Some(pool_id) => pool_id, None => { return Err(crate::KbError::InvalidState(format!( "pool '{}' has no internal id", pool.address ))); } } } None => { let pool_dto = crate::KbPoolDto::new( dex_id, pool_address.clone(), crate::KbPoolKind::Amm, crate::KbPoolStatus::Active, ); let upsert_result = crate::upsert_pool(self.database.as_ref(), &pool_dto).await; match upsert_result { Ok(pool_id) => pool_id, Err(error) => return Err(error), } } }; let existing_pair_result = crate::get_pair_by_pool_id(self.database.as_ref(), pool_id).await; let existing_pair_option = match existing_pair_result { Ok(existing_pair_option) => existing_pair_option, Err(error) => return Err(error), }; let created_pair = existing_pair_option.is_none(); let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str()); let pair_id = match existing_pair_option { Some(pair) => { let pair_id_option = pair.id; match pair_id_option { Some(pair_id) => pair_id, None => { return Err(crate::KbError::InvalidState(format!( "pair for pool '{}' has no internal id", pool_id ))); } } } None => { let pair_dto = crate::KbPairDto::new( dex_id, pool_id, base_token_id, quote_token_id, pair_symbol, ); let upsert_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await; match upsert_result { Ok(pair_id) => pair_id, Err(error) => return Err(error), } } }; let upsert_base_pool_token_result = crate::upsert_pool_token( self.database.as_ref(), &crate::KbPoolTokenDto::new( pool_id, base_token_id, crate::KbPoolTokenRole::Base, None, Some(0), ), ) .await; if let Err(error) = upsert_base_pool_token_result { return Err(error); } let upsert_quote_pool_token_result = crate::upsert_pool_token( self.database.as_ref(), &crate::KbPoolTokenDto::new( pool_id, quote_token_id, crate::KbPoolTokenRole::Quote, None, Some(1), ), ) .await; if let Err(error) = upsert_quote_pool_token_result { return Err(error); } if let Some(lp_token_id) = lp_token_id { let upsert_lp_pool_token_result = crate::upsert_pool_token( self.database.as_ref(), &crate::KbPoolTokenDto::new( pool_id, lp_token_id, crate::KbPoolTokenRole::LpMint, None, None, ), ) .await; if let Err(error) = upsert_lp_pool_token_result { return Err(error); } } let existing_listing_result = crate::get_pool_listing_by_pool_id(self.database.as_ref(), pool_id).await; let existing_listing_option = match existing_listing_result { Ok(existing_listing_option) => existing_listing_option, Err(error) => return Err(error), }; let created_listing = existing_listing_option.is_none(); let pool_listing_id = match existing_listing_option { Some(pool_listing) => pool_listing.id, None => { let listing_id_result = self .upsert_pool_listing_from_decoded_event(dex_id, pool_id, pair_id, transaction) .await; match listing_id_result { Ok(listing_id) => Some(listing_id), Err(error) => return Err(error), } } }; let payload_value_result = kb_parse_payload_json(decoded_event.payload_json.as_str()); let payload_value = match payload_value_result { Ok(payload_value) => payload_value, Err(error) => return Err(error), }; if created_pool { let signal_result = self .record_detection_signal( transaction, "signal.dex.new_pool", crate::KbAnalysisSignalSeverity::Low, payload_value.clone(), ) .await; if let Err(error) = signal_result { return Err(error); } } if created_pair { let signal_result = self .record_detection_signal( transaction, "signal.dex.new_pair", crate::KbAnalysisSignalSeverity::Low, payload_value.clone(), ) .await; if let Err(error) = signal_result { return Err(error); } } if created_listing { let signal_result = self .record_detection_signal( transaction, "signal.dex.first_listing_seen", crate::KbAnalysisSignalSeverity::Low, payload_value, ) .await; if let Err(error) = signal_result { return Err(error); } } Ok(crate::KbDexPoolDetectionResult { decoded_event_id, dex_id, pool_id, pair_id, pool_listing_id, created_pool, created_pair, created_listing, }) } async fn ensure_raydium_dex(&self) -> Result { let dex_result = crate::get_dex_by_code(self.database.as_ref(), "raydium").await; let dex_option = match dex_result { Ok(dex_option) => dex_option, Err(error) => return Err(error), }; match dex_option { Some(dex) => match dex.id { Some(dex_id) => Ok(dex_id), None => Err(crate::KbError::InvalidState( "raydium dex has no internal id".to_string(), )), }, None => { let dex_dto = crate::KbDexDto::new( "raydium".to_string(), "Raydium".to_string(), Some(crate::KB_RAYDIUM_AMM_V4_PROGRAM_ID.to_string()), None, true, ); crate::upsert_dex(self.database.as_ref(), &dex_dto).await } } } async fn ensure_token(&self, mint: &str) -> Result { let token_result = crate::get_token_by_mint(self.database.as_ref(), mint).await; let token_option = match token_result { Ok(token_option) => token_option, Err(error) => return Err(error), }; match token_option { Some(token) => match token.id { Some(token_id) => Ok(token_id), None => Err(crate::KbError::InvalidState(format!( "token '{}' has no internal id", mint ))), }, None => { let token_dto = crate::KbTokenDto::new( mint.to_string(), None, None, None, crate::SPL_TOKEN_PROGRAM_ID.to_string(), kb_is_quote_mint(mint), ); crate::upsert_token(self.database.as_ref(), &token_dto).await } } } async fn upsert_pool_listing_from_decoded_event( &self, dex_id: i64, pool_id: i64, pair_id: i64, transaction: &crate::KbChainTransactionDto, ) -> Result { let listing_dto = crate::KbPoolListingDto::new( dex_id, pool_id, Some(pair_id), crate::KbObservationSourceKind::Dex, transaction.source_endpoint_name.clone(), None, None, None, ); crate::upsert_pool_listing(self.database.as_ref(), &listing_dto).await } async fn record_detection_signal( &self, transaction: &crate::KbChainTransactionDto, signal_kind: &str, severity: crate::KbAnalysisSignalSeverity, payload: serde_json::Value, ) -> Result { let observation_result = self .persistence .record_observation(&crate::KbDetectionObservationInput::new( "dex.business_detection".to_string(), crate::KbObservationSourceKind::HttpRpc, transaction.source_endpoint_name.clone(), transaction.signature.clone(), transaction.slot, payload.clone(), )) .await; let observation_id = match observation_result { Ok(observation_id) => observation_id, Err(error) => return Err(error), }; self.persistence .record_signal(&crate::KbDetectionSignalInput::new( signal_kind.to_string(), severity, transaction.signature.clone(), Some(observation_id), None, payload, )) .await } } fn kb_is_quote_mint(mint: &str) -> bool { mint == "So11111111111111111111111111111111111111112" } fn kb_choose_base_quote_order(token_a_mint: &str, token_b_mint: &str) -> bool { let token_a_is_quote = kb_is_quote_mint(token_a_mint); let token_b_is_quote = kb_is_quote_mint(token_b_mint); if token_a_is_quote && !token_b_is_quote { return false; } if token_b_is_quote && !token_a_is_quote { return true; } true } fn kb_build_pair_symbol( base_mint: &str, quote_mint: &str, ) -> std::option::Option { let base_symbol = kb_symbol_hint_from_mint(base_mint); let quote_symbol = kb_symbol_hint_from_mint(quote_mint); match (base_symbol, quote_symbol) { (Some(base_symbol), Some(quote_symbol)) => Some(format!("{base_symbol}/{quote_symbol}")), _ => None, } } fn kb_symbol_hint_from_mint(mint: &str) -> std::option::Option { if mint == "So11111111111111111111111111111111111111112" { return Some("WSOL".to_string()); } None } fn kb_parse_payload_json(payload_json: &str) -> Result { let parse_result = serde_json::from_str::(payload_json); match parse_result { Ok(value) => Ok(value), Err(error) => Err(crate::KbError::Json(format!( "cannot parse dex decoded event payload_json '{}': {}", payload_json, error ))), } } #[cfg(test)] mod tests { async fn make_database() -> std::sync::Arc { let tempdir_result = tempfile::tempdir(); let tempdir = match tempdir_result { Ok(tempdir) => tempdir, Err(error) => panic!("tempdir must succeed: {}", error), }; let database_path = tempdir.path().join("dex_detect.sqlite3"); let config = crate::KbDatabaseConfig { enabled: true, backend: crate::KbDatabaseBackend::Sqlite, sqlite: crate::KbSqliteDatabaseConfig { path: database_path.to_string_lossy().to_string(), create_if_missing: true, busy_timeout_ms: 5000, max_connections: 1, auto_initialize_schema: true, use_wal: true, }, }; let database_result = crate::KbDatabase::connect_and_initialize(&config).await; let database = match database_result { Ok(database) => database, Err(error) => panic!("database init must succeed: {}", error), }; std::sync::Arc::new(database) } async fn seed_decoded_raydium_event( database: std::sync::Arc, signature: &str, ) { let transaction_model = crate::KbTransactionModelService::new(database.clone()); let dex_decode = crate::KbDexDecodeService::new(database); let resolved_transaction = serde_json::json!({ "slot": 910001, "blockTime": 1779100001, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::KB_RAYDIUM_AMM_V4_PROGRAM_ID, "program": "raydium-amm-v4", "stackHeight": 1, "accounts": [ "Account0", "Account1", "Account2", "Account3", "PoolDetect111", "Account5", "Account6", "LpDetect111", "TokenDetectA111", "So11111111111111111111111111111111111111112", "Account10", "Account11", "Account12", "Account13", "Account14", "Account15", "MarketDetect111" ], "data": "opaque" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: initialize2" ] } }); let project_result = transaction_model .persist_resolved_transaction( signature, Some("helius_primary_http".to_string()), &resolved_transaction, ) .await; if let Err(error) = project_result { panic!("projection must succeed: {}", error); } let decode_result = dex_decode.decode_transaction_by_signature(signature).await; if let Err(error) = decode_result { panic!("dex decode must succeed: {}", error); } } #[tokio::test] async fn detect_transaction_by_signature_creates_pool_pair_and_listing() { let database = make_database().await; seed_decoded_raydium_event(database.clone(), "sig-dex-detect-1").await; let detect_service = crate::KbDexDetectService::new(database.clone()); let detect_result = detect_service .detect_transaction_by_signature("sig-dex-detect-1") .await; let results = match detect_result { Ok(results) => results, Err(error) => panic!("dex detect must succeed: {}", error), }; assert_eq!(results.len(), 1); assert!(results[0].created_pool); assert!(results[0].created_pair); assert!(results[0].created_listing); let pool_result = crate::get_pool_by_address(database.as_ref(), "PoolDetect111").await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => panic!("pool fetch must succeed: {}", error), }; let pool = match pool_option { Some(pool) => pool, None => panic!("pool must exist"), }; assert_eq!(pool.id, Some(results[0].pool_id)); let pair_result = crate::get_pair_by_pool_id(database.as_ref(), results[0].pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => panic!("pair fetch must succeed: {}", error), }; let pair = match pair_option { Some(pair) => pair, None => panic!("pair must exist"), }; assert_eq!(pair.id, Some(results[0].pair_id)); let listing_result = crate::get_pool_listing_by_pool_id(database.as_ref(), results[0].pool_id).await; let listing_option = match listing_result { Ok(listing_option) => listing_option, Err(error) => panic!("listing fetch must succeed: {}", error), }; let listing = match listing_option { Some(listing) => listing, None => panic!("listing must exist"), }; assert_eq!(listing.id, results[0].pool_listing_id); let pool_tokens_result = crate::list_pool_tokens_by_pool_id(database.as_ref(), results[0].pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => panic!("pool tokens list must succeed: {}", error), }; assert_eq!(pool_tokens.len(), 3); } #[tokio::test] async fn detect_transaction_by_signature_is_idempotent() { let database = make_database().await; seed_decoded_raydium_event(database.clone(), "sig-dex-detect-2").await; let detect_service = crate::KbDexDetectService::new(database.clone()); let first_result = detect_service .detect_transaction_by_signature("sig-dex-detect-2") .await; let first_results = match first_result { Ok(first_results) => first_results, Err(error) => panic!("first dex detect must succeed: {}", error), }; assert_eq!(first_results.len(), 1); assert!(first_results[0].created_pool); assert!(first_results[0].created_pair); assert!(first_results[0].created_listing); let second_result = detect_service .detect_transaction_by_signature("sig-dex-detect-2") .await; let second_results = match second_result { Ok(second_results) => second_results, Err(error) => panic!("second dex detect must succeed: {}", error), }; assert_eq!(second_results.len(), 1); assert!(!second_results[0].created_pool); assert!(!second_results[0].created_pair); assert!(!second_results[0].created_listing); let pools_result = crate::list_pools(database.as_ref()).await; let pools = match pools_result { Ok(pools) => pools, Err(error) => panic!("pools list must succeed: {}", error), }; assert_eq!(pools.len(), 1); let pairs_result = crate::list_pairs(database.as_ref()).await; let pairs = match pairs_result { Ok(pairs) => pairs, Err(error) => panic!("pairs list must succeed: {}", error), }; assert_eq!(pairs.len(), 1); let listings_result = crate::list_pool_listings(database.as_ref()).await; let listings = match listings_result { Ok(listings) => listings, Err(error) => panic!("listings list must succeed: {}", error), }; assert_eq!(listings.len(), 1); } }