// file: kb_lib/src/trade_aggregation.rs //! Cross-DEX trade aggregation service. /// One trade-aggregation result. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct TradeAggregationResult { /// Persisted trade-event id. pub trade_event_id: i64, /// Persisted pair-metric id. pub pair_metric_id: i64, /// Related pair id. pub pair_id: i64, /// Related pool id. pub pool_id: i64, /// Whether the trade-event row was newly created. pub created_trade_event: bool, } /// Trade-aggregation service. #[derive(Debug, Clone)] pub struct TradeAggregationService { database: std::sync::Arc, persistence: crate::DetectionPersistenceService, } impl TradeAggregationService { /// Creates a new trade-aggregation service. pub fn new(database: std::sync::Arc) -> Self { let persistence = crate::DetectionPersistenceService::new(database.clone()); return Self { database, persistence }; } /// Records normalized trade events and updates pair metrics for one transaction signature. pub async fn record_transaction_by_signature( &self, signature: &str, ) -> Result, crate::Error> { let transaction_context = crate::trade_aggregation_context::load_trade_aggregation_transaction_context( self.database.as_ref(), signature, ) .await; let transaction_context = match transaction_context { Ok(transaction_context) => transaction_context, Err(error) => return Err(error), }; let transaction = transaction_context.transaction; let transaction_id = transaction_context.transaction_id; let decoded_events = transaction_context.decoded_events; let mut results = std::vec::Vec::new(); for decoded_event in &decoded_events { if !crate::is_dex_trade_event_kind(decoded_event.event_kind.as_str()) { continue; } let event_context = crate::trade_aggregation_context::load_trade_aggregation_decoded_event_context( self.database.as_ref(), decoded_event, ) .await; let event_context = match event_context { Ok(Some(event_context)) => event_context, Ok(None) => continue, Err(error) => return Err(error), }; let decoded_event_id = event_context.decoded_event_id; let existing_trade_option = event_context.existing_trade_event; let pool_address = event_context.pool_address; let pool = event_context.pool; let pool_id = event_context.pool_id; let pair = event_context.pair; let pair_id = event_context.pair_id; let base_token_mint = event_context.base_token_mint; let base_token_decimals = event_context.base_token_decimals; let quote_token_mint = event_context.quote_token_mint; let quote_token_decimals = event_context.quote_token_decimals; let base_vault_address = event_context.base_vault_address; let quote_vault_address = event_context.quote_vault_address; let payload_result = serde_json::from_str::(decoded_event.payload_json.as_str()); let payload = match payload_result { Ok(payload) => payload, Err(error) => { tracing::warn!( event_kind = %decoded_event.event_kind, pool_account = ?decoded_event.pool_account, decoded_event_id = ?decoded_event.id, error = %error, "skipping decoded event with invalid payload_json" ); continue; }, }; if !crate::is_decoded_event_trade_candidate(decoded_event.event_kind.as_str(), &payload) { tracing::debug!( event_kind = %decoded_event.event_kind, pool_account = ?decoded_event.pool_account, decoded_event_id = ?decoded_event.id, "skipping non-trade decoded event" ); continue; } if !crate::is_decoded_event_candle_candidate( decoded_event.event_kind.as_str(), &payload, ) { tracing::debug!( event_kind = %decoded_event.event_kind, pool_account = ?decoded_event.pool_account, decoded_event_id = ?decoded_event.id, "skipping non-candle decoded trade candidate" ); continue; } let trade_side = crate::trade_side_resolution::extract_trade_side( decoded_event.event_kind.as_str(), &payload, ); let amount_input = crate::trade_amount_resolution::TradeAmountResolutionInput { database: self.database.as_ref(), transaction: &transaction, decoded_event, payload: &payload, pool_address: pool_address.as_str(), base_token_mint: base_token_mint.as_deref(), quote_token_mint: quote_token_mint.as_deref(), base_token_decimals, quote_token_decimals, base_vault_address: base_vault_address.as_deref(), quote_vault_address: quote_vault_address.as_deref(), }; let amount_resolution = crate::trade_amount_resolution::resolve_trade_amounts(&amount_input).await; let amount_resolution = match amount_resolution { Ok(amount_resolution) => amount_resolution, Err(error) => return Err(error), }; let base_amount_raw = amount_resolution.base_amount_raw.clone(); let quote_amount_raw = amount_resolution.quote_amount_raw.clone(); let price_quote_per_base = amount_resolution.price_quote_per_base; if !crate::trade_metric_update::is_priced_trade_event( base_amount_raw.as_deref(), quote_amount_raw.as_deref(), price_quote_per_base, ) { tracing::debug!( event_kind = %decoded_event.event_kind, pool_account = ?decoded_event.pool_account, decoded_event_id = ?decoded_event.id, transaction_signature = %transaction.signature, base_amount_raw = ?base_amount_raw, quote_amount_raw = ?quote_amount_raw, price_quote_per_base = ?price_quote_per_base, "skipping unpriced trade aggregation candidate" ); continue; } let materialization_input = crate::trade_event_materialization::TradeEventMaterializationInput { database: self.database.as_ref(), persistence: &self.persistence, transaction: &transaction, transaction_id, decoded_event, decoded_event_id, existing_trade_event: existing_trade_option, pool: &pool, pool_id, pair: &pair, pair_id, trade_side, amount_resolution: &amount_resolution, }; let materialization_result = crate::trade_event_materialization::materialize_trade_event(materialization_input) .await; let materialization_result = match materialization_result { Ok(materialization_result) => materialization_result, Err(error) => return Err(error), }; results.push(materialization_result); } return Ok(results); } } #[cfg(test)] mod tests { async fn make_database() -> std::sync::Arc { let tempdir_result = tempfile::tempdir(); let tempdir = match tempdir_result { Ok(tempdir) => tempdir, Err(error) => panic!("tempdir must succeed: {}", error), }; let database_path = tempdir.path().join("trade_aggregation.sqlite3"); let config = crate::DatabaseConfig { enabled: true, backend: crate::DatabaseBackend::Sqlite, sqlite: crate::SqliteDatabaseConfig { path: database_path.to_string_lossy().to_string(), create_if_missing: true, busy_timeout_ms: 5000, max_connections: 1, auto_initialize_schema: true, use_wal: true, }, }; let database_result = crate::Database::connect_and_initialize(&config).await; let database = match database_result { Ok(database) => database, Err(error) => panic!("database init must succeed: {}", error), }; return std::sync::Arc::new(database); } async fn seed_fluxbeam_swap_transaction( database: std::sync::Arc, signature: &str, base_amount_raw: &str, quote_amount_raw: &str, ) { let transaction_model = crate::TransactionModelService::new(database.clone()); let dex_decode = crate::DexDecodeService::new(database.clone()); let dex_detect = crate::DexDetectService::new(database); let resolved_transaction = serde_json::json!({ "slot": 940001, "blockTime": 1779400001, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::FLUXBEAM_PROGRAM_ID, "program": "fluxbeam", "stackHeight": 1, "accounts": [ "TradeAggPool111", "TradeAggLpMint111", "TradeAggTokenA111", crate::WSOL_MINT_ID ], "parsed": { "info": { "instruction": "swap", "pool": "TradeAggPool111", "tokenA": "TradeAggTokenA111", "tokenB": crate::WSOL_MINT_ID, "baseAmountRaw": base_amount_raw, "quoteAmountRaw": quote_amount_raw } }, "data": "opaque" } ] } }, "meta": { "err": null, "logMessages": [ "Program log: Instruction: Swap", "Program log: buy" ] } }); let project_result = transaction_model .persist_resolved_transaction( signature, Some("helius_primary_http".to_string()), &resolved_transaction, ) .await; if let Err(error) = project_result { panic!("projection must succeed: {}", error); } let decode_result = dex_decode.decode_transaction_by_signature(signature).await; if let Err(error) = decode_result { panic!("dex decode must succeed: {}", error); } let detect_result = dex_detect.detect_transaction_by_signature(signature).await; if let Err(error) = detect_result { panic!("dex detect must succeed: {}", error); } } #[tokio::test] async fn record_transaction_by_signature_creates_trade_event_and_pair_metric() { let database = make_database().await; seed_fluxbeam_swap_transaction(database.clone(), "sig-trade-aggregation-1", "1000", "2500") .await; let service = crate::TradeAggregationService::new(database.clone()); let record_result = service.record_transaction_by_signature("sig-trade-aggregation-1").await; let results = match record_result { Ok(results) => results, Err(error) => panic!("trade aggregation must succeed: {}", error), }; assert_eq!(results.len(), 1); assert!(results[0].created_trade_event); let trade_events_result = crate::query_trade_events_list_by_pair_id(database.as_ref(), results[0].pair_id).await; let trade_events = match trade_events_result { Ok(trade_events) => trade_events, Err(error) => panic!("trade event list must succeed: {}", error), }; assert_eq!(trade_events.len(), 1); assert_eq!(trade_events[0].trade_side, crate::SwapTradeSide::BuyBase); assert_eq!(trade_events[0].base_amount_raw, Some("1000".to_string())); assert_eq!(trade_events[0].quote_amount_raw, Some("2500".to_string())); assert_eq!(trade_events[0].price_quote_per_base, Some(2.5)); let pair_metric_result = crate::query_pair_metrics_get_by_pair_id(database.as_ref(), results[0].pair_id).await; let pair_metric_option = match pair_metric_result { Ok(pair_metric_option) => pair_metric_option, Err(error) => panic!("pair metric fetch must succeed: {}", error), }; let pair_metric = match pair_metric_option { Some(pair_metric) => pair_metric, None => panic!("pair metric must exist"), }; assert_eq!(pair_metric.trade_count, 1); assert_eq!(pair_metric.buy_count, 1); assert_eq!(pair_metric.sell_count, 0); assert_eq!(pair_metric.cumulative_base_amount_raw, Some("1000".to_string())); assert_eq!(pair_metric.cumulative_quote_amount_raw, Some("2500".to_string())); assert_eq!(pair_metric.last_price_quote_per_base, Some(2.5)); } #[tokio::test] async fn record_transaction_by_signature_is_idempotent() { let database = make_database().await; seed_fluxbeam_swap_transaction(database.clone(), "sig-trade-aggregation-2", "1000", "2500") .await; let service = crate::TradeAggregationService::new(database.clone()); let first_result = service.record_transaction_by_signature("sig-trade-aggregation-2").await; let first_results = match first_result { Ok(first_results) => first_results, Err(error) => panic!("first trade aggregation must succeed: {}", error), }; assert_eq!(first_results.len(), 1); assert!(first_results[0].created_trade_event); let second_result = service.record_transaction_by_signature("sig-trade-aggregation-2").await; let second_results = match second_result { Ok(second_results) => second_results, Err(error) => panic!("second trade aggregation must succeed: {}", error), }; assert_eq!(second_results.len(), 1); assert!(!second_results[0].created_trade_event); let trade_events_result = crate::query_trade_events_list_by_pair_id(database.as_ref(), first_results[0].pair_id) .await; let trade_events = match trade_events_result { Ok(trade_events) => trade_events, Err(error) => panic!("trade event list must succeed: {}", error), }; assert_eq!(trade_events.len(), 1); let pair_metric_result = crate::query_pair_metrics_get_by_pair_id(database.as_ref(), first_results[0].pair_id) .await; let pair_metric_option = match pair_metric_result { Ok(pair_metric_option) => pair_metric_option, Err(error) => panic!("pair metric fetch must succeed: {}", error), }; let pair_metric = match pair_metric_option { Some(pair_metric) => pair_metric, None => panic!("pair metric must exist"), }; assert_eq!(pair_metric.trade_count, 1); } }