// file: kb_lib/src/pool_origin.rs //! Cross-DEX pool-origin recording service. /// One recorded pool-origin result. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct KbPoolOriginResult { /// Persisted pool-origin id. pub pool_origin_id: i64, /// Related pool id. pub pool_id: i64, /// Optional related pair id. pub pair_id: std::option::Option, /// Optional related pool listing id. pub pool_listing_id: std::option::Option, /// Whether the pool-origin row was newly created. pub created_origin: bool, } /// Pool-origin consolidation service. #[derive(Debug, Clone)] pub struct KbPoolOriginService { database: std::sync::Arc, persistence: crate::KbDetectionPersistenceService, } impl KbPoolOriginService { /// Creates a new pool-origin service. pub fn new(database: std::sync::Arc) -> Self { let persistence = crate::KbDetectionPersistenceService::new(database.clone()); Self { database, persistence, } } /// Records pool-origin rows for one resolved transaction signature. pub async fn record_transaction_by_signature( &self, signature: &str, ) -> Result, crate::KbError> { let transaction_result = crate::get_chain_transaction_by_signature(self.database.as_ref(), signature).await; let transaction_option = match transaction_result { Ok(transaction_option) => transaction_option, Err(error) => return Err(error), }; let transaction = match transaction_option { Some(transaction) => transaction, None => { return Err(crate::KbError::InvalidState(format!( "cannot record pool origins for unknown transaction '{}'", signature ))); } }; let transaction_id = match transaction.id { Some(transaction_id) => transaction_id, None => { return Err(crate::KbError::InvalidState(format!( "transaction '{}' has no internal id", signature ))); } }; let decoded_events_result = crate::list_dex_decoded_events_by_transaction_id( self.database.as_ref(), transaction_id, ) .await; let decoded_events = match decoded_events_result { Ok(decoded_events) => decoded_events, Err(error) => return Err(error), }; let mut seen_pool_ids = std::collections::HashSet::::new(); let mut results = std::vec::Vec::new(); for decoded_event in &decoded_events { let decoded_event_id = match decoded_event.id { Some(decoded_event_id) => decoded_event_id, None => { return Err(crate::KbError::InvalidState( "decoded event has no internal id".to_string(), )); } }; let pool_address = match decoded_event.pool_account.clone() { Some(pool_address) => pool_address, None => continue, }; let pool_result = crate::get_pool_by_address(self.database.as_ref(), pool_address.as_str()).await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => return Err(error), }; let pool = match pool_option { Some(pool) => pool, None => continue, }; let pool_id = match pool.id { Some(pool_id) => pool_id, None => { return Err(crate::KbError::InvalidState(format!( "pool '{}' has no internal id", pool.address ))); } }; if seen_pool_ids.contains(&pool_id) { continue; } seen_pool_ids.insert(pool_id); let pair_result = crate::get_pair_by_pool_id(self.database.as_ref(), pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => return Err(error), }; let pair_id = match pair_option { Some(pair) => pair.id, None => None, }; let listing_result = crate::get_pool_listing_by_pool_id(self.database.as_ref(), pool_id).await; let listing_option = match listing_result { Ok(listing_option) => listing_option, Err(error) => return Err(error), }; let pool_listing_id = match listing_option { Some(listing) => listing.id, None => None, }; let launch_attribution_result = crate::get_launch_attribution_by_decoded_event_id( self.database.as_ref(), decoded_event_id, ) .await; let launch_attribution_option = match launch_attribution_result { Ok(launch_attribution_option) => launch_attribution_option, Err(error) => return Err(error), }; let launch_attribution_id = match launch_attribution_option { Some(launch_attribution) => launch_attribution.id, None => None, }; let existing_result = crate::get_pool_origin_by_pool_id(self.database.as_ref(), pool_id).await; let existing_option = match existing_result { Ok(existing_option) => existing_option, Err(error) => return Err(error), }; let created_origin = existing_option.is_none(); let dto = crate::KbPoolOriginDto::new( pool.dex_id, pool_id, pair_id, pool_listing_id, transaction_id, decoded_event_id, transaction.signature.clone(), decoded_event.protocol_name.clone(), decoded_event.event_kind.clone(), crate::KbObservationSourceKind::HttpRpc, transaction.source_endpoint_name.clone(), launch_attribution_id, ); let upsert_result = crate::upsert_pool_origin(self.database.as_ref(), &dto).await; let pool_origin_id = match upsert_result { Ok(pool_origin_id) => pool_origin_id, Err(error) => return Err(error), }; if created_origin { let payload = serde_json::json!({ "poolId": pool_id, "pairId": pair_id, "poolListingId": pool_listing_id, "foundingSignature": transaction.signature, "protocolName": decoded_event.protocol_name, "eventKind": decoded_event.event_kind, "launchAttributionId": launch_attribution_id }); let observation_result = self .persistence .record_observation(&crate::KbDetectionObservationInput::new( "dex.pool_origin".to_string(), crate::KbObservationSourceKind::HttpRpc, transaction.source_endpoint_name.clone(), transaction.signature.clone(), transaction.slot, payload.clone(), )) .await; let observation_id = match observation_result { Ok(observation_id) => observation_id, Err(error) => return Err(error), }; let signal_result = self .persistence .record_signal(&crate::KbDetectionSignalInput::new( "signal.dex.pool_origin.recorded".to_string(), crate::KbAnalysisSignalSeverity::Low, transaction.signature.clone(), Some(observation_id), None, payload, )) .await; if let Err(error) = signal_result { return Err(error); } } results.push(crate::KbPoolOriginResult { pool_origin_id, pool_id, pair_id, pool_listing_id, created_origin, }); } Ok(results) } } #[cfg(test)] mod tests { async fn make_database() -> std::sync::Arc { let tempdir_result = tempfile::tempdir(); let tempdir = match tempdir_result { Ok(tempdir) => tempdir, Err(error) => panic!("tempdir must succeed: {}", error), }; let database_path = tempdir.path().join("pool_origin.sqlite3"); let config = crate::KbDatabaseConfig { enabled: true, backend: crate::KbDatabaseBackend::Sqlite, sqlite: crate::KbSqliteDatabaseConfig { path: database_path.to_string_lossy().to_string(), create_if_missing: true, busy_timeout_ms: 5000, max_connections: 1, auto_initialize_schema: true, use_wal: true, }, }; let database_result = crate::KbDatabase::connect_and_initialize(&config).await; let database = match database_result { Ok(database) => database, Err(error) => panic!("database init must succeed: {}", error), }; std::sync::Arc::new(database) } async fn seed_bags_backed_meteora_dbc_transaction( database: std::sync::Arc, signature: &str, ) { let transaction_model = crate::KbTransactionModelService::new(database.clone()); let dex_decode = crate::KbDexDecodeService::new(database.clone()); let dex_detect = crate::KbDexDetectService::new(database.clone()); let launch_origin = crate::KbLaunchOriginService::new(database.clone()); let bags_register_result = launch_origin .register_bags_pool_mapping( "DbcOriginToken111".to_string().as_str(), Some("DbcOriginConfig111".to_string()), Some("DbcOriginPool111".to_string()), None, ) .await; if let Err(error) = bags_register_result { panic!("bags mapping registration must succeed: {}", error); } let resolved_transaction = serde_json::json!({ "slot": 920001, "blockTime": 1779200001, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::KB_METEORA_DBC_PROGRAM_ID, "program": "meteora-dbc", "stackHeight": 1, "accounts": [ "DbcOriginPool111", "DbcOriginToken111", "So11111111111111111111111111111111111111112", "DbcOriginConfig111", "DbcOriginCreator111" ], "parsed": { "info": { "pool": "DbcOriginPool111", "baseMint": "DbcOriginToken111", "quoteMint": "So11111111111111111111111111111111111111112", "poolConfig": "DbcOriginConfig111", "creator": "DbcOriginCreator111" } }, "data": "opaque" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: Instruction: CreatePool" ] } }); let project_result = transaction_model .persist_resolved_transaction( signature, Some("helius_primary_http".to_string()), &resolved_transaction, ) .await; if let Err(error) = project_result { panic!("projection must succeed: {}", error); } let decode_result = dex_decode.decode_transaction_by_signature(signature).await; if let Err(error) = decode_result { panic!("dex decode must succeed: {}", error); } let detect_result = dex_detect.detect_transaction_by_signature(signature).await; if let Err(error) = detect_result { panic!("dex detect must succeed: {}", error); } let attribution_result = launch_origin.attribute_transaction_by_signature(signature).await; if let Err(error) = attribution_result { panic!("launch attribution must succeed: {}", error); } } #[tokio::test] async fn record_transaction_by_signature_creates_pool_origin() { let database = make_database().await; seed_bags_backed_meteora_dbc_transaction(database.clone(), "sig-pool-origin-1").await; let service = crate::KbPoolOriginService::new(database.clone()); let record_result = service .record_transaction_by_signature("sig-pool-origin-1") .await; let results = match record_result { Ok(results) => results, Err(error) => panic!("pool-origin recording must succeed: {}", error), }; assert_eq!(results.len(), 1); assert!(results[0].created_origin); let fetched_result = crate::get_pool_origin_by_pool_id(database.as_ref(), results[0].pool_id).await; let fetched_option = match fetched_result { Ok(fetched_option) => fetched_option, Err(error) => panic!("pool-origin fetch must succeed: {}", error), }; let fetched = match fetched_option { Some(fetched) => fetched, None => panic!("pool origin must exist"), }; assert_eq!(fetched.pool_id, results[0].pool_id); assert_eq!(fetched.founding_signature, "sig-pool-origin-1".to_string()); assert_eq!(fetched.founding_protocol_name, "meteora_dbc".to_string()); assert_eq!(fetched.founding_event_kind, "meteora_dbc.create_pool".to_string()); assert!(fetched.launch_attribution_id.is_some()); } #[tokio::test] async fn record_transaction_by_signature_is_idempotent() { let database = make_database().await; seed_bags_backed_meteora_dbc_transaction(database.clone(), "sig-pool-origin-2").await; let service = crate::KbPoolOriginService::new(database.clone()); let first_result = service .record_transaction_by_signature("sig-pool-origin-2") .await; let first_results = match first_result { Ok(first_results) => first_results, Err(error) => panic!("first pool-origin recording must succeed: {}", error), }; assert_eq!(first_results.len(), 1); assert!(first_results[0].created_origin); let second_result = service .record_transaction_by_signature("sig-pool-origin-2") .await; let second_results = match second_result { Ok(second_results) => second_results, Err(error) => panic!("second pool-origin recording must succeed: {}", error), }; assert_eq!(second_results.len(), 1); assert!(!second_results[0].created_origin); let listed_result = crate::list_pool_origins(database.as_ref()).await; let listed = match listed_result { Ok(listed) => listed, Err(error) => panic!("pool-origin list must succeed: {}", error), }; assert_eq!(listed.len(), 1); } }