// file: kb_lib/src/dex_detect.rs //! Business-level detection built from decoded DEX events. /// Result of one business-level DEX pool detection. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DexPoolDetectionResult { /// Parent decoded event id. pub decoded_event_id: i64, /// DEX id. pub dex_id: i64, /// Pool id. pub pool_id: i64, /// Pair id. pub pair_id: i64, /// Optional pool listing id. pub pool_listing_id: std::option::Option, /// Whether the pool was newly created by this detection. pub created_pool: bool, /// Whether the pair was newly created by this detection. pub created_pair: bool, /// Whether the listing was newly created by this detection. pub created_listing: bool, } /// Business-level DEX detection service. #[derive(Debug, Clone)] pub struct DexDetectService { database: std::sync::Arc, persistence: crate::DetectionPersistenceService, } impl DexDetectService { /// Creates a new DEX detection service. pub fn new(database: std::sync::Arc) -> Self { let persistence = crate::DetectionPersistenceService::new(database.clone()); return Self { database, persistence }; } /// Detects business-level DEX objects from one transaction signature. pub async fn detect_transaction_by_signature( &self, signature: &str, ) -> Result, crate::Error> { let transaction_result = crate::query_chain_transactions_get_by_signature(self.database.as_ref(), signature) .await; let transaction_option = match transaction_result { Ok(transaction_option) => transaction_option, Err(error) => return Err(error), }; let transaction = match transaction_option { Some(transaction) => transaction, None => { return Err(crate::Error::InvalidState(format!( "cannot detect dex objects from unknown transaction '{}'", signature ))); }, }; let transaction_id_option = transaction.id; let transaction_id = match transaction_id_option { Some(transaction_id) => transaction_id, None => { return Err(crate::Error::InvalidState(format!( "transaction '{}' has no internal id", signature ))); }, }; let decoded_events_result = crate::query_dex_decoded_events_list_by_transaction_id( self.database.as_ref(), transaction_id, ) .await; let decoded_events = match decoded_events_result { Ok(decoded_events) => decoded_events, Err(error) => return Err(error), }; let mut detection_results = std::vec::Vec::new(); for decoded_event in &decoded_events { let route_option = crate::dex_detection_route::dex_detection_route(decoded_event); let route = match route_option { Some(route) => route, None => continue, }; let detect_result = match route { crate::dex_detection_route::DexDetectionRoute::RaydiumAmmV4Initialize2Pool => { self.detect_raydium_initialize2_pool(&transaction, decoded_event).await }, crate::dex_detection_route::DexDetectionRoute::RaydiumCpmmTrade => { self.detect_raydium_cpmm_trade(&transaction, decoded_event).await }, crate::dex_detection_route::DexDetectionRoute::RaydiumClmmTrade => { self.detect_raydium_clmm_trade(&transaction, decoded_event).await }, crate::dex_detection_route::DexDetectionRoute::PumpFunCreateV2Token => { self.detect_pump_fun_create_v2_token(&transaction, decoded_event).await }, crate::dex_detection_route::DexDetectionRoute::PumpFunTrade => { self.detect_pump_fun_trade(&transaction, decoded_event).await }, crate::dex_detection_route::DexDetectionRoute::PumpSwapTrade => { self.detect_pump_swap_trade(&transaction, decoded_event).await }, crate::dex_detection_route::DexDetectionRoute::SkipIncompletePumpSwapTrade => { tracing::trace!( decoded_event_id = ?decoded_event.id, event_kind = %decoded_event.event_kind, pool_account = ?decoded_event.pool_account, "skipping incomplete pump_swap decoded event during detection" ); continue; }, crate::dex_detection_route::DexDetectionRoute::MeteoraDbcPool => { self.detect_meteora_dbc_pool(&transaction, decoded_event).await }, crate::dex_detection_route::DexDetectionRoute::MeteoraDlmmPool => { self.detect_meteora_dlmm_pool(&transaction, decoded_event).await }, crate::dex_detection_route::DexDetectionRoute::MeteoraDammV1Pool => { self.detect_meteora_damm_v1_pool(&transaction, decoded_event).await }, crate::dex_detection_route::DexDetectionRoute::MeteoraDammV2Pool => { self.detect_meteora_damm_v2_pool(&transaction, decoded_event).await }, crate::dex_detection_route::DexDetectionRoute::OrcaWhirlpoolsPool => { self.detect_orca_whirlpools_pool(&transaction, decoded_event).await }, crate::dex_detection_route::DexDetectionRoute::FluxbeamPool => { self.detect_fluxbeam_pool(&transaction, decoded_event).await }, crate::dex_detection_route::DexDetectionRoute::DexlabPool => { self.detect_dexlab_pool(&transaction, decoded_event).await }, }; let detect_result = match detect_result { Ok(detect_result) => detect_result, Err(error) => return Err(error), }; detection_results.push(detect_result); } return Ok(detection_results); } async fn detect_raydium_initialize2_pool( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, ) -> Result { let dex_id_result = crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "raydium_amm_v4").await; let dex_id = match dex_id_result { Ok(dex_id) => dex_id, Err(error) => return Err(error), }; let input_result = crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event( decoded_event, dex_id, crate::PoolKind::Amm, crate::PoolStatus::Active, crate::dex_pool_materialization::DexPoolTokenOrder::ChooseBaseQuoteFromTokenAB, None, None, transaction.source_endpoint_name.clone(), ); let input = match input_result { Ok(input) => input, Err(error) => return Err(error), }; let detection_result = crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input) .await; let detection_result = match detection_result { Ok(detection_result) => detection_result, Err(error) => return Err(error), }; let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str()); let payload_value = match payload_value_result { Ok(payload_value) => payload_value, Err(error) => return Err(error), }; let signal_result = self .record_pool_detection_signals( transaction, "signal.dex", &detection_result, payload_value, ) .await; if let Err(error) = signal_result { return Err(error); } return Ok(detection_result); } async fn detect_pump_fun_create_v2_token( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, ) -> Result { let dex_id_result = crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "pump_fun").await; let dex_id = match dex_id_result { Ok(dex_id) => dex_id, Err(error) => return Err(error), }; let token_mint_result = crate::dex_pool_materialization::required_token_a_mint(decoded_event); let token_mint = match token_mint_result { Ok(token_mint) => token_mint, Err(error) => return Err(error), }; let input_result = crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event_with_mints( decoded_event, dex_id, token_mint, crate::WSOL_MINT_ID.to_string(), decoded_event.lp_mint.clone(), crate::PoolKind::BondingCurve, crate::PoolStatus::Pending, crate::dex_pool_materialization::DexPoolTokenOrder::ChooseBaseQuoteFromTokenAB, None, None, transaction.source_endpoint_name.clone(), ); let input = match input_result { Ok(input) => input, Err(error) => return Err(error), }; let detection_result = crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input) .await; let detection_result = match detection_result { Ok(detection_result) => detection_result, Err(error) => return Err(error), }; let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str()); let payload_value = match payload_value_result { Ok(payload_value) => payload_value, Err(error) => return Err(error), }; let signal_result = self .record_pool_detection_signals( transaction, "signal.dex.pump_fun", &detection_result, payload_value, ) .await; if let Err(error) = signal_result { return Err(error); } return Ok(detection_result); } async fn detect_pump_fun_trade( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, ) -> Result { let dex_id_result = crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "pump_fun").await; let dex_id = match dex_id_result { Ok(dex_id) => dex_id, Err(error) => return Err(error), }; let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str()); let payload_value = match payload_value_result { Ok(payload_value) => payload_value, Err(error) => return Err(error), }; let vault_addresses = extract_pump_fun_vault_addresses(&payload_value); let token_a_vault_address = vault_addresses.0; let token_b_vault_address = vault_addresses.1; let input_result = crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event( decoded_event, dex_id, crate::PoolKind::BondingCurve, crate::PoolStatus::Active, crate::dex_pool_materialization::DexPoolTokenOrder::ChooseBaseQuoteFromTokenAB, token_a_vault_address, token_b_vault_address, transaction.source_endpoint_name.clone(), ); let input = match input_result { Ok(input) => input, Err(error) => return Err(error), }; let detection_result = crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input) .await; let detection_result = match detection_result { Ok(detection_result) => detection_result, Err(error) => return Err(error), }; let signal_result = self .record_pool_detection_signals( transaction, "signal.dex.pump_fun", &detection_result, payload_value, ) .await; if let Err(error) = signal_result { return Err(error); } return Ok(detection_result); } async fn detect_pump_swap_trade( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, ) -> Result { let dex_id_result = crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "pump_swap").await; let dex_id = match dex_id_result { Ok(dex_id) => dex_id, Err(error) => return Err(error), }; let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str()); let payload_value = match payload_value_result { Ok(payload_value) => payload_value, Err(error) => return Err(error), }; let vault_addresses = extract_pump_swap_vault_addresses(&payload_value); let token_a_vault_address = vault_addresses.0; let token_b_vault_address = vault_addresses.1; let input_result = crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event( decoded_event, dex_id, crate::PoolKind::Amm, crate::PoolStatus::Active, crate::dex_pool_materialization::DexPoolTokenOrder::ChooseBaseQuoteFromTokenAB, token_a_vault_address, token_b_vault_address, transaction.source_endpoint_name.clone(), ); let input = match input_result { Ok(input) => input, Err(error) => return Err(error), }; let detection_result = crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input) .await; let detection_result = match detection_result { Ok(detection_result) => detection_result, Err(error) => return Err(error), }; let signal_result = self .record_pool_detection_signals( transaction, "signal.dex.pump_swap", &detection_result, payload_value, ) .await; if let Err(error) = signal_result { return Err(error); } return Ok(detection_result); } async fn detect_meteora_dbc_pool( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, ) -> Result { return self .detect_materialized_pool_from_decoded_event( transaction, decoded_event, "meteora_dbc", crate::PoolKind::BondingCurve, crate::PoolStatus::Pending, "signal.dex.meteora_dbc", ) .await; } async fn detect_meteora_dlmm_pool( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, ) -> Result { let dex_id_result = crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "meteora_dlmm").await; let dex_id = match dex_id_result { Ok(dex_id) => dex_id, Err(error) => return Err(error), }; let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str()); let payload_value = match payload_value_result { Ok(payload_value) => payload_value, Err(error) => return Err(error), }; let reserve_x_account = extract_payload_string_field(&payload_value, "reserveXAccount"); let reserve_y_account = extract_payload_string_field(&payload_value, "reserveYAccount"); let input_result = crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event( decoded_event, dex_id, crate::PoolKind::Clmm, crate::PoolStatus::Active, crate::dex_pool_materialization::DexPoolTokenOrder::ChooseBaseQuoteFromTokenAB, reserve_x_account, reserve_y_account, transaction.source_endpoint_name.clone(), ); let input = match input_result { Ok(input) => input, Err(error) => return Err(error), }; let detection_result = crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input) .await; let detection_result = match detection_result { Ok(detection_result) => detection_result, Err(error) => return Err(error), }; let signal_result = self .record_pool_detection_signals( transaction, "signal.dex.meteora_dlmm", &detection_result, payload_value, ) .await; if let Err(error) = signal_result { return Err(error); } return Ok(detection_result); } async fn detect_meteora_damm_v1_pool( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, ) -> Result { return self .detect_materialized_pool_from_decoded_event( transaction, decoded_event, "meteora_damm_v1", crate::PoolKind::Amm, crate::PoolStatus::Active, "signal.dex.meteora_damm_v1", ) .await; } async fn detect_meteora_damm_v2_pool( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, ) -> Result { return self .detect_materialized_pool_from_decoded_event( transaction, decoded_event, "meteora_damm_v2", crate::PoolKind::Amm, crate::PoolStatus::Active, "signal.dex.meteora_damm_v2", ) .await; } async fn detect_orca_whirlpools_pool( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, ) -> Result { return self .detect_materialized_pool_from_decoded_event( transaction, decoded_event, "orca_whirlpools", crate::PoolKind::Clmm, crate::PoolStatus::Active, "signal.dex.orca_whirlpools", ) .await; } async fn detect_fluxbeam_pool( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, ) -> Result { return self .detect_materialized_pool_from_decoded_event( transaction, decoded_event, "fluxbeam", crate::PoolKind::Amm, crate::PoolStatus::Active, "signal.dex.fluxbeam", ) .await; } async fn detect_dexlab_pool( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, ) -> Result { return self .detect_materialized_pool_from_decoded_event( transaction, decoded_event, "dexlab", crate::PoolKind::Amm, crate::PoolStatus::Active, "signal.dex.dexlab", ) .await; } async fn detect_raydium_clmm_trade( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, ) -> Result { let dex_id_result = crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "raydium_clmm").await; let dex_id = match dex_id_result { Ok(dex_id) => dex_id, Err(error) => return Err(error), }; let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str()); let payload_value = match payload_value_result { Ok(payload_value) => payload_value, Err(error) => return Err(error), }; let base_vault_address = extract_payload_string_field(&payload_value, "base_vault"); let quote_vault_address = extract_payload_string_field(&payload_value, "quote_vault"); let input_result = crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event( decoded_event, dex_id, crate::PoolKind::Clmm, crate::PoolStatus::Active, crate::dex_pool_materialization::DexPoolTokenOrder::AlreadyBaseQuote, base_vault_address, quote_vault_address, transaction.source_endpoint_name.clone(), ); let input = match input_result { Ok(input) => input, Err(error) => return Err(error), }; let detection_result = crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input) .await; let detection_result = match detection_result { Ok(detection_result) => detection_result, Err(error) => return Err(error), }; let signal_result = self .record_pool_detection_signals( transaction, "signal.dex.raydium_clmm", &detection_result, payload_value, ) .await; if let Err(error) = signal_result { return Err(error); } return Ok(detection_result); } async fn detect_raydium_cpmm_trade( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, ) -> Result { let dex_id_result = crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "raydium_cpmm").await; let dex_id = match dex_id_result { Ok(dex_id) => dex_id, Err(error) => return Err(error), }; let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str()); let payload_value = match payload_value_result { Ok(payload_value) => payload_value, Err(error) => return Err(error), }; let base_vault_address = extract_payload_string_field(&payload_value, "base_vault"); let quote_vault_address = extract_payload_string_field(&payload_value, "quote_vault"); let input_result = crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event( decoded_event, dex_id, crate::PoolKind::Amm, crate::PoolStatus::Active, crate::dex_pool_materialization::DexPoolTokenOrder::AlreadyBaseQuote, base_vault_address, quote_vault_address, transaction.source_endpoint_name.clone(), ); let input = match input_result { Ok(input) => input, Err(error) => return Err(error), }; let detection_result = crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input) .await; let detection_result = match detection_result { Ok(detection_result) => detection_result, Err(error) => return Err(error), }; let signal_result = self .record_pool_detection_signals( transaction, "signal.dex.raydium_cpmm", &detection_result, payload_value, ) .await; if let Err(error) = signal_result { return Err(error); } return Ok(detection_result); } async fn detect_materialized_pool_from_decoded_event( &self, transaction: &crate::ChainTransactionDto, decoded_event: &crate::DexDecodedEventDto, dex_code: &str, pool_kind: crate::PoolKind, pool_status: crate::PoolStatus, signal_prefix: &str, ) -> Result { let dex_id_result = crate::dex_catalog::ensure_known_dex(self.database.as_ref(), dex_code).await; let dex_id = match dex_id_result { Ok(dex_id) => dex_id, Err(error) => return Err(error), }; let input_result = crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event( decoded_event, dex_id, pool_kind, pool_status, crate::dex_pool_materialization::DexPoolTokenOrder::ChooseBaseQuoteFromTokenAB, None, None, transaction.source_endpoint_name.clone(), ); let input = match input_result { Ok(input) => input, Err(error) => return Err(error), }; let detection_result = crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input) .await; let detection_result = match detection_result { Ok(detection_result) => detection_result, Err(error) => return Err(error), }; let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str()); let payload_value = match payload_value_result { Ok(payload_value) => payload_value, Err(error) => return Err(error), }; let signal_result = self .record_pool_detection_signals( transaction, signal_prefix, &detection_result, payload_value, ) .await; if let Err(error) = signal_result { return Err(error); } return Ok(detection_result); } async fn record_detection_signal( &self, transaction: &crate::ChainTransactionDto, signal_kind: &str, severity: crate::AnalysisSignalSeverity, payload: serde_json::Value, ) -> Result { let observation_result = self .persistence .record_observation(&crate::DetectionObservationInput::new( "dex.business_detection".to_string(), crate::ObservationSourceKind::HttpRpc, transaction.source_endpoint_name.clone(), transaction.signature.clone(), transaction.slot, payload.clone(), )) .await; let observation_id = match observation_result { Ok(observation_id) => observation_id, Err(error) => return Err(error), }; return self .persistence .record_signal(&crate::DetectionSignalInput::new( signal_kind.to_string(), severity, transaction.signature.clone(), Some(observation_id), None, payload, )) .await; } async fn record_pool_detection_signals( &self, transaction: &crate::ChainTransactionDto, signal_prefix: &str, detection_result: &crate::DexPoolDetectionResult, payload: serde_json::Value, ) -> Result<(), crate::Error> { if detection_result.created_pool { let signal_kind = format!("{signal_prefix}.new_pool"); let signal_result = self .record_detection_signal( transaction, signal_kind.as_str(), crate::AnalysisSignalSeverity::Low, payload.clone(), ) .await; if let Err(error) = signal_result { return Err(error); } } if detection_result.created_pair { let signal_kind = format!("{signal_prefix}.new_pair"); let signal_result = self .record_detection_signal( transaction, signal_kind.as_str(), crate::AnalysisSignalSeverity::Low, payload.clone(), ) .await; if let Err(error) = signal_result { return Err(error); } } if detection_result.created_listing { let signal_kind = format!("{signal_prefix}.first_listing_seen"); let signal_result = self .record_detection_signal( transaction, signal_kind.as_str(), crate::AnalysisSignalSeverity::Low, payload, ) .await; if let Err(error) = signal_result { return Err(error); } } return Ok(()); } } fn parse_payload_json(payload_json: &str) -> Result { let parse_result = serde_json::from_str::(payload_json); match parse_result { Ok(value) => return Ok(value), Err(error) => { return Err(crate::Error::Json(format!( "cannot parse dex decoded event payload_json '{}': {}", payload_json, error ))); }, } } fn extract_string_from_array_index( values: &[serde_json::Value], index: usize, ) -> std::option::Option { if index >= values.len() { return None; } let value = &values[index]; let text_option = value.as_str(); let text = match text_option { Some(text) => text.trim(), None => return None, }; if text.is_empty() { return None; } return Some(text.to_string()); } fn extract_pump_fun_vault_addresses( payload_value: &serde_json::Value, ) -> ( std::option::Option, std::option::Option, ) { let accounts_option = payload_value.get("accounts"); let accounts = match accounts_option { Some(accounts) => accounts, None => return (None, None), }; let accounts_array_option = accounts.as_array(); let accounts_array = match accounts_array_option { Some(accounts_array) => accounts_array, None => return (None, None), }; let token_a_vault_address = extract_string_from_array_index(accounts_array, 4); let token_b_native_address = extract_string_from_array_index(accounts_array, 3); return (token_a_vault_address, token_b_native_address); } fn extract_pump_swap_vault_addresses( payload_value: &serde_json::Value, ) -> ( std::option::Option, std::option::Option, ) { let accounts_option = payload_value.get("accounts"); let accounts = match accounts_option { Some(accounts) => accounts, None => return (None, None), }; let accounts_array_option = accounts.as_array(); let accounts_array = match accounts_array_option { Some(accounts_array) => accounts_array, None => return (None, None), }; let token_a_vault_address = extract_string_from_array_index(accounts_array, 7); let token_b_vault_address = extract_string_from_array_index(accounts_array, 8); return (token_a_vault_address, token_b_vault_address); } fn extract_payload_string_field( payload_value: &serde_json::Value, field_name: &str, ) -> std::option::Option { let value_option = payload_value.get(field_name); let value = match value_option { Some(value) => value, None => return None, }; match value.as_str() { Some(value) => return Some(value.to_string()), None => return None, } } #[cfg(test)] mod tests { async fn make_database() -> std::sync::Arc { let tempdir_result = tempfile::tempdir(); let tempdir = match tempdir_result { Ok(tempdir) => tempdir, Err(error) => panic!("tempdir must succeed: {}", error), }; let database_path = tempdir.path().join("dex_detect.sqlite3"); let config = crate::DatabaseConfig { enabled: true, backend: crate::DatabaseBackend::Sqlite, sqlite: crate::SqliteDatabaseConfig { path: database_path.to_string_lossy().to_string(), create_if_missing: true, busy_timeout_ms: 5000, max_connections: 1, auto_initialize_schema: true, use_wal: true, }, }; let database_result = crate::Database::connect_and_initialize(&config).await; let database = match database_result { Ok(database) => database, Err(error) => panic!("database init must succeed: {}", error), }; return std::sync::Arc::new(database); } async fn seed_decoded_raydium_event( database: std::sync::Arc, signature: &str, ) { let transaction_model = crate::TransactionModelService::new(database.clone()); let dex_decode = crate::DexDecodeService::new(database); let resolved_transaction = serde_json::json!({ "slot": 910001, "blockTime": 1779100001, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::RAYDIUM_AMM_V4_PROGRAM_ID, "program": "raydium-amm-v4", "stackHeight": 1, "accounts": [ "Account0", "Account1", "Account2", "Account3", "PoolDetect111", "Account5", "Account6", "LpDetect111", "TokenDetectA111", crate::WSOL_MINT_ID, "Account10", "Account11", "Account12", "Account13", "Account14", "Account15", "MarketDetect111" ], "data": "opaque" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: initialize2" ] } }); let project_result = transaction_model .persist_resolved_transaction( signature, Some("helius_primary_http".to_string()), &resolved_transaction, ) .await; if let Err(error) = project_result { panic!("projection must succeed: {}", error); } let decode_result = dex_decode.decode_transaction_by_signature(signature).await; if let Err(error) = decode_result { panic!("dex decode must succeed: {}", error); } } #[tokio::test] async fn detect_transaction_by_signature_creates_pool_pair_and_listing() { let database = make_database().await; seed_decoded_raydium_event(database.clone(), "sig-dex-detect-1").await; let detect_service = crate::DexDetectService::new(database.clone()); let detect_result = detect_service.detect_transaction_by_signature("sig-dex-detect-1").await; let results = match detect_result { Ok(results) => results, Err(error) => panic!("dex detect must succeed: {}", error), }; assert_eq!(results.len(), 1); assert!(results[0].created_pool); assert!(results[0].created_pair); assert!(results[0].created_listing); let pool_result = crate::query_pools_get_by_address(database.as_ref(), "PoolDetect111").await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => panic!("pool fetch must succeed: {}", error), }; let pool = match pool_option { Some(pool) => pool, None => panic!("pool must exist"), }; assert_eq!(pool.id, Some(results[0].pool_id)); let pair_result = crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => panic!("pair fetch must succeed: {}", error), }; let pair = match pair_option { Some(pair) => pair, None => panic!("pair must exist"), }; assert_eq!(pair.id, Some(results[0].pair_id)); let listing_result = crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let listing_option = match listing_result { Ok(listing_option) => listing_option, Err(error) => panic!("listing fetch must succeed: {}", error), }; let listing = match listing_option { Some(listing) => listing, None => panic!("listing must exist"), }; assert_eq!(listing.id, results[0].pool_listing_id); let pool_tokens_result = crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => panic!("pool tokens list must succeed: {}", error), }; assert_eq!(pool_tokens.len(), 3); } #[tokio::test] async fn detect_transaction_by_signature_is_idempotent() { let database = make_database().await; seed_decoded_raydium_event(database.clone(), "sig-dex-detect-2").await; let detect_service = crate::DexDetectService::new(database.clone()); let first_result = detect_service.detect_transaction_by_signature("sig-dex-detect-2").await; let first_results = match first_result { Ok(first_results) => first_results, Err(error) => panic!("first dex detect must succeed: {}", error), }; assert_eq!(first_results.len(), 1); assert!(first_results[0].created_pool); assert!(first_results[0].created_pair); assert!(first_results[0].created_listing); let second_result = detect_service.detect_transaction_by_signature("sig-dex-detect-2").await; let second_results = match second_result { Ok(second_results) => second_results, Err(error) => panic!("second dex detect must succeed: {}", error), }; assert_eq!(second_results.len(), 1); assert!(!second_results[0].created_pool); assert!(!second_results[0].created_pair); assert!(!second_results[0].created_listing); let pools_result = crate::query_pools_list(database.as_ref()).await; let pools = match pools_result { Ok(pools) => pools, Err(error) => panic!("pools list must succeed: {}", error), }; assert_eq!(pools.len(), 1); let pairs_result = crate::query_pairs_list(database.as_ref()).await; let pairs = match pairs_result { Ok(pairs) => pairs, Err(error) => panic!("pairs list must succeed: {}", error), }; assert_eq!(pairs.len(), 1); let listings_result = crate::query_pool_listings_list(database.as_ref()).await; let listings = match listings_result { Ok(listings) => listings, Err(error) => panic!("listings list must succeed: {}", error), }; assert_eq!(listings.len(), 1); } async fn seed_decoded_pump_fun_event( database: std::sync::Arc, signature: &str, ) { let transaction_model = crate::TransactionModelService::new(database.clone()); let dex_decode = crate::DexDecodeService::new(database); let resolved_transaction = serde_json::json!({ "slot": 910002, "blockTime": 1779100002, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::PUMP_FUN_PROGRAM_ID, "program": "pump", "stackHeight": 1, "accounts": [ "MintPumpDetect111", "MintAuthority111", "BondingCurveDetect111", "AssociatedBondingCurveDetect111", "Global111", "CreatorDetect111", "System111", "Token2022Program111", "AtaProgram111" ], "data": "opaque" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: Instruction: CreateV2" ] } }); let project_result = transaction_model .persist_resolved_transaction( signature, Some("helius_primary_http".to_string()), &resolved_transaction, ) .await; if let Err(error) = project_result { panic!("projection must succeed: {}", error); } let decode_result = dex_decode.decode_transaction_by_signature(signature).await; if let Err(error) = decode_result { panic!("dex decode must succeed: {}", error); } } #[tokio::test] async fn detect_transaction_by_signature_creates_pump_fun_pool_pair_and_listing() { let database = make_database().await; seed_decoded_pump_fun_event(database.clone(), "sig-dex-detect-pump-1").await; let detect_service = crate::DexDetectService::new(database.clone()); let detect_result = detect_service.detect_transaction_by_signature("sig-dex-detect-pump-1").await; let results = match detect_result { Ok(results) => results, Err(error) => panic!("dex detect must succeed: {}", error), }; assert_eq!(results.len(), 1); assert!(results[0].created_pool); assert!(results[0].created_pair); assert!(results[0].created_listing); let pool_result = crate::query_pools_get_by_address(database.as_ref(), "BondingCurveDetect111").await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => panic!("pool fetch must succeed: {}", error), }; let pool = match pool_option { Some(pool) => pool, None => panic!("pool must exist"), }; assert_eq!(pool.id, Some(results[0].pool_id)); assert_eq!(pool.pool_kind, crate::PoolKind::BondingCurve); let pair_result = crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => panic!("pair fetch must succeed: {}", error), }; let pair = match pair_option { Some(pair) => pair, None => panic!("pair must exist"), }; assert_eq!(pair.id, Some(results[0].pair_id)); let listing_result = crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let listing_option = match listing_result { Ok(listing_option) => listing_option, Err(error) => panic!("listing fetch must succeed: {}", error), }; let listing = match listing_option { Some(listing) => listing, None => panic!("listing must exist"), }; assert_eq!(listing.id, results[0].pool_listing_id); let pool_tokens_result = crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => panic!("pool tokens list must succeed: {}", error), }; assert_eq!(pool_tokens.len(), 2); } async fn seed_decoded_pump_swap_event( database: std::sync::Arc, signature: &str, ) { let transaction_model = crate::TransactionModelService::new(database.clone()); let dex_decode = crate::DexDecodeService::new(database); let resolved_transaction = serde_json::json!({ "slot": 910003, "blockTime": 1779100003, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::PUMP_SWAP_PROGRAM_ID, "program": "pump-amm", "stackHeight": 1, "accounts": [ "PumpSwapDetectPool111", "PumpSwapDetectTokenA111", crate::WSOL_MINT_ID, "PumpSwapDetectPoolV2_111" ], "parsed": { "info": { "pool": "PumpSwapDetectPool111", "baseMint": "PumpSwapDetectTokenA111", "quoteMint": crate::WSOL_MINT_ID, "poolV2": "PumpSwapDetectPoolV2_111" } }, "data": "AJTQ2h9DXrBfqJi53PDQG2Fvki5tkaTU3" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: Instruction: Buy" ] } }); let project_result = transaction_model .persist_resolved_transaction( signature, Some("helius_primary_http".to_string()), &resolved_transaction, ) .await; if let Err(error) = project_result { panic!("projection must succeed: {}", error); } let decode_result = dex_decode.decode_transaction_by_signature(signature).await; if let Err(error) = decode_result { panic!("dex decode must succeed: {}", error); } } #[tokio::test] async fn detect_transaction_by_signature_creates_pump_swap_pool_pair_and_listing() { let database = make_database().await; seed_decoded_pump_swap_event(database.clone(), "sig-dex-detect-pumpswap-1").await; let detect_service = crate::DexDetectService::new(database.clone()); let detect_result = detect_service .detect_transaction_by_signature("sig-dex-detect-pumpswap-1") .await; let results = match detect_result { Ok(results) => results, Err(error) => panic!("dex detect must succeed: {}", error), }; assert_eq!(results.len(), 1); assert!(results[0].created_pool); assert!(results[0].created_pair); assert!(results[0].created_listing); let pool_result = crate::query_pools_get_by_address(database.as_ref(), "PumpSwapDetectPool111").await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => panic!("pool fetch must succeed: {}", error), }; let pool = match pool_option { Some(pool) => pool, None => panic!("pool must exist"), }; assert_eq!(pool.id, Some(results[0].pool_id)); assert_eq!(pool.pool_kind, crate::PoolKind::Amm); let pair_result = crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => panic!("pair fetch must succeed: {}", error), }; let pair = match pair_option { Some(pair) => pair, None => panic!("pair must exist"), }; assert_eq!(pair.id, Some(results[0].pair_id)); let listing_result = crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let listing_option = match listing_result { Ok(listing_option) => listing_option, Err(error) => panic!("listing fetch must succeed: {}", error), }; let listing = match listing_option { Some(listing) => listing, None => panic!("listing must exist"), }; assert_eq!(listing.id, results[0].pool_listing_id); let pool_tokens_result = crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => panic!("pool tokens list must succeed: {}", error), }; assert_eq!(pool_tokens.len(), 2); } #[tokio::test] async fn detect_transaction_by_signature_is_idempotent_for_pump_swap() { let database = make_database().await; seed_decoded_pump_swap_event(database.clone(), "sig-dex-detect-pumpswap-2").await; let detect_service = crate::DexDetectService::new(database.clone()); let first_result = detect_service .detect_transaction_by_signature("sig-dex-detect-pumpswap-2") .await; let first_results = match first_result { Ok(first_results) => first_results, Err(error) => panic!("first dex detect must succeed: {}", error), }; assert_eq!(first_results.len(), 1); assert!(first_results[0].created_pool); assert!(first_results[0].created_pair); assert!(first_results[0].created_listing); let second_result = detect_service .detect_transaction_by_signature("sig-dex-detect-pumpswap-2") .await; let second_results = match second_result { Ok(second_results) => second_results, Err(error) => panic!("second dex detect must succeed: {}", error), }; assert_eq!(second_results.len(), 1); assert!(!second_results[0].created_pool); assert!(!second_results[0].created_pair); assert!(!second_results[0].created_listing); } async fn seed_decoded_meteora_dbc_event( database: std::sync::Arc, signature: &str, ) { let transaction_model = crate::TransactionModelService::new(database.clone()); let dex_decode = crate::DexDecodeService::new(database); let resolved_transaction = serde_json::json!({ "slot": 910004, "blockTime": 1779100004, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::METEORA_DBC_PROGRAM_ID, "program": "meteora-dbc", "stackHeight": 1, "accounts": [ "DbcDetectPool111", "DbcDetectTokenA111", crate::WSOL_MINT_ID, "DbcDetectConfig111", "DbcDetectCreator111" ], "parsed": { "info": { "pool": "DbcDetectPool111", "baseMint": "DbcDetectTokenA111", "quoteMint": crate::WSOL_MINT_ID, "poolConfig": "DbcDetectConfig111", "creator": "DbcDetectCreator111" } }, "data": "opaque" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: Instruction: CreatePool" ] } }); let project_result = transaction_model .persist_resolved_transaction( signature, Some("helius_primary_http".to_string()), &resolved_transaction, ) .await; if let Err(error) = project_result { panic!("projection must succeed: {}", error); } let decode_result = dex_decode.decode_transaction_by_signature(signature).await; if let Err(error) = decode_result { panic!("dex decode must succeed: {}", error); } } #[tokio::test] async fn detect_transaction_by_signature_creates_meteora_dbc_pool_pair_and_listing() { let database = make_database().await; seed_decoded_meteora_dbc_event(database.clone(), "sig-dex-detect-dbc-1").await; let detect_service = crate::DexDetectService::new(database.clone()); let detect_result = detect_service.detect_transaction_by_signature("sig-dex-detect-dbc-1").await; let results = match detect_result { Ok(results) => results, Err(error) => panic!("dex detect must succeed: {}", error), }; assert_eq!(results.len(), 1); assert!(results[0].created_pool); assert!(results[0].created_pair); assert!(results[0].created_listing); let pool_result = crate::query_pools_get_by_address(database.as_ref(), "DbcDetectPool111").await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => panic!("pool fetch must succeed: {}", error), }; let pool = match pool_option { Some(pool) => pool, None => panic!("pool must exist"), }; assert_eq!(pool.id, Some(results[0].pool_id)); assert_eq!(pool.pool_kind, crate::PoolKind::BondingCurve); let pair_result = crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => panic!("pair fetch must succeed: {}", error), }; let pair = match pair_option { Some(pair) => pair, None => panic!("pair must exist"), }; assert_eq!(pair.id, Some(results[0].pair_id)); let listing_result = crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let listing_option = match listing_result { Ok(listing_option) => listing_option, Err(error) => panic!("listing fetch must succeed: {}", error), }; let listing = match listing_option { Some(listing) => listing, None => panic!("listing must exist"), }; assert_eq!(listing.id, results[0].pool_listing_id); let pool_tokens_result = crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => panic!("pool tokens list must succeed: {}", error), }; assert_eq!(pool_tokens.len(), 2); } async fn seed_decoded_meteora_damm_v2_event( database: std::sync::Arc, signature: &str, ) { let transaction_model = crate::TransactionModelService::new(database.clone()); let dex_decode = crate::DexDecodeService::new(database); let resolved_transaction = serde_json::json!({ "slot": 910005, "blockTime": 1779100005, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::METEORA_DAMM_V2_PROGRAM_ID, "program": "meteora-damm-v2", "stackHeight": 1, "accounts": [ "DammV2DetectPool111", "DammV2DetectTokenA111", crate::WSOL_MINT_ID, "DammV2DetectConfig111", "DammV2DetectCreator111" ], "parsed": { "info": { "instruction": "initialize_customizable_pool", "pool": "DammV2DetectPool111", "tokenAMint": "DammV2DetectTokenA111", "tokenBMint": crate::WSOL_MINT_ID, "creator": "DammV2DetectCreator111", "isCustomizablePool": true } }, "data": "opaque" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: Instruction: InitializeCustomizablePool" ] } }); let project_result = transaction_model .persist_resolved_transaction( signature, Some("helius_primary_http".to_string()), &resolved_transaction, ) .await; if let Err(error) = project_result { panic!("projection must succeed: {}", error); } let decode_result = dex_decode.decode_transaction_by_signature(signature).await; if let Err(error) = decode_result { panic!("dex decode must succeed: {}", error); } } #[tokio::test] async fn detect_transaction_by_signature_creates_meteora_damm_v2_pool_pair_and_listing() { let database = make_database().await; seed_decoded_meteora_damm_v2_event(database.clone(), "sig-dex-detect-dammv2-1").await; let detect_service = crate::DexDetectService::new(database.clone()); let detect_result = detect_service.detect_transaction_by_signature("sig-dex-detect-dammv2-1").await; let results = match detect_result { Ok(results) => results, Err(error) => panic!("dex detect must succeed: {}", error), }; assert_eq!(results.len(), 1); assert!(results[0].created_pool); assert!(results[0].created_pair); assert!(results[0].created_listing); let pool_result = crate::query_pools_get_by_address(database.as_ref(), "DammV2DetectPool111").await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => panic!("pool fetch must succeed: {}", error), }; let pool = match pool_option { Some(pool) => pool, None => panic!("pool must exist"), }; assert_eq!(pool.id, Some(results[0].pool_id)); assert_eq!(pool.pool_kind, crate::PoolKind::Amm); let pair_result = crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => panic!("pair fetch must succeed: {}", error), }; let pair = match pair_option { Some(pair) => pair, None => panic!("pair must exist"), }; assert_eq!(pair.id, Some(results[0].pair_id)); let listing_result = crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let listing_option = match listing_result { Ok(listing_option) => listing_option, Err(error) => panic!("listing fetch must succeed: {}", error), }; let listing = match listing_option { Some(listing) => listing, None => panic!("listing must exist"), }; assert_eq!(listing.id, results[0].pool_listing_id); let pool_tokens_result = crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => panic!("pool tokens list must succeed: {}", error), }; assert_eq!(pool_tokens.len(), 2); } async fn seed_decoded_meteora_damm_v1_event( database: std::sync::Arc, signature: &str, ) { let transaction_model = crate::TransactionModelService::new(database.clone()); let dex_decode = crate::DexDecodeService::new(database); let resolved_transaction = serde_json::json!({ "slot": 910006, "blockTime": 1779100006, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::METEORA_DAMM_V1_PROGRAM_ID, "program": "meteora-damm-v1", "stackHeight": 1, "accounts": [ "DammV1DetectPool111", "DammV1DetectTokenA111", crate::WSOL_MINT_ID, "DammV1DetectConfig111", "DammV1DetectCreator111" ], "parsed": { "info": { "instruction": "initialize_pool_with_config", "pool": "DammV1DetectPool111", "tokenAMint": "DammV1DetectTokenA111", "tokenBMint": crate::WSOL_MINT_ID, "config": "DammV1DetectConfig111", "creator": "DammV1DetectCreator111" } }, "data": "opaque" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: Instruction: InitializePoolWithConfig" ] } }); let project_result = transaction_model .persist_resolved_transaction( signature, Some("helius_primary_http".to_string()), &resolved_transaction, ) .await; if let Err(error) = project_result { panic!("projection must succeed: {}", error); } let decode_result = dex_decode.decode_transaction_by_signature(signature).await; if let Err(error) = decode_result { panic!("dex decode must succeed: {}", error); } } #[tokio::test] async fn detect_transaction_by_signature_creates_meteora_damm_v1_pool_pair_and_listing() { let database = make_database().await; seed_decoded_meteora_damm_v1_event(database.clone(), "sig-dex-detect-dammv1-1").await; let detect_service = crate::DexDetectService::new(database.clone()); let detect_result = detect_service.detect_transaction_by_signature("sig-dex-detect-dammv1-1").await; let results = match detect_result { Ok(results) => results, Err(error) => panic!("dex detect must succeed: {}", error), }; assert_eq!(results.len(), 1); assert!(results[0].created_pool); assert!(results[0].created_pair); assert!(results[0].created_listing); let pool_result = crate::query_pools_get_by_address(database.as_ref(), "DammV1DetectPool111").await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => panic!("pool fetch must succeed: {}", error), }; let pool = match pool_option { Some(pool) => pool, None => panic!("pool must exist"), }; assert_eq!(pool.id, Some(results[0].pool_id)); assert_eq!(pool.pool_kind, crate::PoolKind::Amm); let pair_result = crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => panic!("pair fetch must succeed: {}", error), }; let pair = match pair_option { Some(pair) => pair, None => panic!("pair must exist"), }; assert_eq!(pair.id, Some(results[0].pair_id)); let listing_result = crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let listing_option = match listing_result { Ok(listing_option) => listing_option, Err(error) => panic!("listing fetch must succeed: {}", error), }; let listing = match listing_option { Some(listing) => listing, None => panic!("listing must exist"), }; assert_eq!(listing.id, results[0].pool_listing_id); let pool_tokens_result = crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => panic!("pool tokens list must succeed: {}", error), }; assert_eq!(pool_tokens.len(), 2); } async fn seed_decoded_orca_whirlpools_event( database: std::sync::Arc, signature: &str, ) { let transaction_model = crate::TransactionModelService::new(database.clone()); let dex_decode = crate::DexDecodeService::new(database); let resolved_transaction = serde_json::json!({ "slot": 910007, "blockTime": 1779100007, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::ORCA_WHIRLPOOLS_PROGRAM_ID, "program": "orca-whirlpools", "stackHeight": 1, "accounts": [ "OrcaDetectPool111", "OrcaDetectTokenA111", crate::WSOL_MINT_ID, "OrcaDetectConfig111", "OrcaDetectCreator111" ], "parsed": { "info": { "instruction": "initialize_pool_v2", "whirlpool": "OrcaDetectPool111", "tokenMintA": "OrcaDetectTokenA111", "tokenMintB": crate::WSOL_MINT_ID, "whirlpoolsConfig": "OrcaDetectConfig111", "funder": "OrcaDetectCreator111", "tokenProgramA": crate::SPL_TOKEN_PROGRAM_ID, "tokenProgramB": crate::SPL_TOKEN_PROGRAM_ID } }, "data": "opaque" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: Instruction: InitializePoolV2" ] } }); let project_result = transaction_model .persist_resolved_transaction( signature, Some("helius_primary_http".to_string()), &resolved_transaction, ) .await; if let Err(error) = project_result { panic!("projection must succeed: {}", error); } let decode_result = dex_decode.decode_transaction_by_signature(signature).await; if let Err(error) = decode_result { panic!("dex decode must succeed: {}", error); } } #[tokio::test] async fn detect_transaction_by_signature_creates_orca_whirlpools_pool_pair_and_listing() { let database = make_database().await; seed_decoded_orca_whirlpools_event(database.clone(), "sig-dex-detect-orca-1").await; let detect_service = crate::DexDetectService::new(database.clone()); let detect_result = detect_service.detect_transaction_by_signature("sig-dex-detect-orca-1").await; let results = match detect_result { Ok(results) => results, Err(error) => panic!("dex detect must succeed: {}", error), }; assert_eq!(results.len(), 1); assert!(results[0].created_pool); assert!(results[0].created_pair); assert!(results[0].created_listing); let pool_result = crate::query_pools_get_by_address(database.as_ref(), "OrcaDetectPool111").await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => panic!("pool fetch must succeed: {}", error), }; let pool = match pool_option { Some(pool) => pool, None => panic!("pool must exist"), }; assert_eq!(pool.id, Some(results[0].pool_id)); assert_eq!(pool.pool_kind, crate::PoolKind::Clmm); let pair_result = crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => panic!("pair fetch must succeed: {}", error), }; let pair = match pair_option { Some(pair) => pair, None => panic!("pair must exist"), }; assert_eq!(pair.id, Some(results[0].pair_id)); let listing_result = crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let listing_option = match listing_result { Ok(listing_option) => listing_option, Err(error) => panic!("listing fetch must succeed: {}", error), }; let listing = match listing_option { Some(listing) => listing, None => panic!("listing must exist"), }; assert_eq!(listing.id, results[0].pool_listing_id); let pool_tokens_result = crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => panic!("pool tokens list must succeed: {}", error), }; assert_eq!(pool_tokens.len(), 2); } async fn seed_decoded_fluxbeam_event( database: std::sync::Arc, signature: &str, ) { let transaction_model = crate::TransactionModelService::new(database.clone()); let dex_decode = crate::DexDecodeService::new(database); let resolved_transaction = serde_json::json!({ "slot": 910008, "blockTime": 1779100008, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::FLUXBEAM_PROGRAM_ID, "program": "fluxbeam", "stackHeight": 1, "accounts": [ "FluxDetectPool111", "FluxDetectLpMint111", "FluxDetectTokenA111", crate::WSOL_MINT_ID, "FluxDetectCreator111" ], "parsed": { "info": { "instruction": "create_pool", "pool": "FluxDetectPool111", "lpMint": "FluxDetectLpMint111", "tokenA": "FluxDetectTokenA111", "tokenB": crate::WSOL_MINT_ID, "payer": "FluxDetectCreator111" } }, "data": "opaque" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: Instruction: CreatePool" ] } }); let project_result = transaction_model .persist_resolved_transaction( signature, Some("helius_primary_http".to_string()), &resolved_transaction, ) .await; if let Err(error) = project_result { panic!("projection must succeed: {}", error); } let decode_result = dex_decode.decode_transaction_by_signature(signature).await; if let Err(error) = decode_result { panic!("dex decode must succeed: {}", error); } } #[tokio::test] async fn detect_transaction_by_signature_creates_fluxbeam_pool_pair_and_listing() { let database = make_database().await; seed_decoded_fluxbeam_event(database.clone(), "sig-dex-detect-fluxbeam-1").await; let detect_service = crate::DexDetectService::new(database.clone()); let detect_result = detect_service .detect_transaction_by_signature("sig-dex-detect-fluxbeam-1") .await; let results = match detect_result { Ok(results) => results, Err(error) => panic!("dex detect must succeed: {}", error), }; assert_eq!(results.len(), 1); assert!(results[0].created_pool); assert!(results[0].created_pair); assert!(results[0].created_listing); let pool_result = crate::query_pools_get_by_address(database.as_ref(), "FluxDetectPool111").await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => panic!("pool fetch must succeed: {}", error), }; let pool = match pool_option { Some(pool) => pool, None => panic!("pool must exist"), }; assert_eq!(pool.id, Some(results[0].pool_id)); assert_eq!(pool.pool_kind, crate::PoolKind::Amm); let pair_result = crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => panic!("pair fetch must succeed: {}", error), }; let pair = match pair_option { Some(pair) => pair, None => panic!("pair must exist"), }; assert_eq!(pair.id, Some(results[0].pair_id)); let listing_result = crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let listing_option = match listing_result { Ok(listing_option) => listing_option, Err(error) => panic!("listing fetch must succeed: {}", error), }; let listing = match listing_option { Some(listing) => listing, None => panic!("listing must exist"), }; assert_eq!(listing.id, results[0].pool_listing_id); let pool_tokens_result = crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => panic!("pool tokens list must succeed: {}", error), }; assert_eq!(pool_tokens.len(), 2); } async fn seed_decoded_dexlab_event(database: std::sync::Arc, signature: &str) { let transaction_model = crate::TransactionModelService::new(database.clone()); let dex_decode = crate::DexDecodeService::new(database); let resolved_transaction = serde_json::json!({ "slot": 910009, "blockTime": 1779100009, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::DEXLAB_PROGRAM_ID, "program": "dexlab", "stackHeight": 1, "accounts": [ "DexlabDetectPool111", "DexlabDetectTokenA111", crate::WSOL_MINT_ID, "DexlabDetectCreator111" ], "parsed": { "info": { "instruction": "create_pool", "pool": "DexlabDetectPool111", "tokenA": "DexlabDetectTokenA111", "tokenB": crate::WSOL_MINT_ID, "payer": "DexlabDetectCreator111", "feeTier": "0.3%" } }, "data": "opaque" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: Instruction: CreatePool" ] } }); let project_result = transaction_model .persist_resolved_transaction( signature, Some("helius_primary_http".to_string()), &resolved_transaction, ) .await; if let Err(error) = project_result { panic!("projection must succeed: {}", error); } let decode_result = dex_decode.decode_transaction_by_signature(signature).await; if let Err(error) = decode_result { panic!("dex decode must succeed: {}", error); } } #[tokio::test] async fn detect_transaction_by_signature_creates_dexlab_pool_pair_and_listing() { let database = make_database().await; seed_decoded_dexlab_event(database.clone(), "sig-dex-detect-dexlab-1").await; let detect_service = crate::DexDetectService::new(database.clone()); let detect_result = detect_service.detect_transaction_by_signature("sig-dex-detect-dexlab-1").await; let results = match detect_result { Ok(results) => results, Err(error) => panic!("dex detect must succeed: {}", error), }; assert_eq!(results.len(), 1); assert!(results[0].created_pool); assert!(results[0].created_pair); assert!(results[0].created_listing); let pool_result = crate::query_pools_get_by_address(database.as_ref(), "DexlabDetectPool111").await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => panic!("pool fetch must succeed: {}", error), }; let pool = match pool_option { Some(pool) => pool, None => panic!("pool must exist"), }; assert_eq!(pool.id, Some(results[0].pool_id)); assert_eq!(pool.pool_kind, crate::PoolKind::Amm); let pair_result = crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => panic!("pair fetch must succeed: {}", error), }; let pair = match pair_option { Some(pair) => pair, None => panic!("pair must exist"), }; assert_eq!(pair.id, Some(results[0].pair_id)); let listing_result = crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await; let listing_option = match listing_result { Ok(listing_option) => listing_option, Err(error) => panic!("listing fetch must succeed: {}", error), }; let listing = match listing_option { Some(listing) => listing, None => panic!("listing must exist"), }; assert_eq!(listing.id, results[0].pool_listing_id); let pool_tokens_result = crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => panic!("pool tokens list must succeed: {}", error), }; assert_eq!(pool_tokens.len(), 2); } }