// file: kb_lib/src/trade_aggregation.rs //! Cross-DEX trade aggregation service. /// One trade-aggregation result. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct TradeAggregationResult { /// Persisted trade-event id. pub trade_event_id: i64, /// Persisted pair-metric id. pub pair_metric_id: i64, /// Related pair id. pub pair_id: i64, /// Related pool id. pub pool_id: i64, /// Whether the trade-event row was newly created. pub created_trade_event: bool, } /// Trade-aggregation service. #[derive(Debug, Clone)] pub struct TradeAggregationService { database: std::sync::Arc, persistence: crate::DetectionPersistenceService, } impl TradeAggregationService { /// Creates a new trade-aggregation service. pub fn new(database: std::sync::Arc) -> Self { let persistence = crate::DetectionPersistenceService::new(database.clone()); return Self { database, persistence }; } /// Records normalized trade events and updates pair metrics for one transaction signature. pub async fn record_transaction_by_signature( &self, signature: &str, ) -> Result, crate::Error> { let transaction_context = crate::trade_aggregation_context::load_trade_aggregation_transaction_context( self.database.as_ref(), signature, ) .await; let transaction_context = match transaction_context { Ok(transaction_context) => transaction_context, Err(error) => return Err(error), }; let transaction = transaction_context.transaction; if crate::trade_aggregation::transaction_has_effective_error(&transaction) { tracing::debug!( signature = %transaction.signature, err_json = ?transaction.err_json, "skipping trade aggregation for failed transaction" ); return Ok(std::vec::Vec::new()); } let transaction_id = transaction_context.transaction_id; let decoded_events = transaction_context.decoded_events; let mut results = std::vec::Vec::new(); for decoded_event in &decoded_events { if !crate::is_dex_trade_event_kind(decoded_event.event_kind.as_str()) { continue; } if crate::trade_aggregation::should_skip_pump_fun_duplicate_trade_event( decoded_event, &decoded_events, ) { tracing::debug!( event_kind = %decoded_event.event_kind, decoded_event_id = ?decoded_event.id, transaction_signature = %transaction.signature, "skipping duplicate pump_fun trade_event because an instruction trade exists" ); continue; } let event_context = crate::trade_aggregation_context::load_trade_aggregation_decoded_event_context( self.database.as_ref(), decoded_event, ) .await; let event_context = match event_context { Ok(Some(event_context)) => event_context, Ok(None) => continue, Err(error) => return Err(error), }; let decoded_event_id = event_context.decoded_event_id; let existing_trade_option = event_context.existing_trade_event; let pool_address = event_context.pool_address; let pool = event_context.pool; let pool_id = event_context.pool_id; let pair = event_context.pair; let pair_id = event_context.pair_id; let base_token_mint = event_context.base_token_mint; let base_token_decimals = event_context.base_token_decimals; let quote_token_mint = event_context.quote_token_mint; let quote_token_decimals = event_context.quote_token_decimals; let base_vault_address = event_context.base_vault_address; let quote_vault_address = event_context.quote_vault_address; let payload_result = serde_json::from_str::(decoded_event.payload_json.as_str()); let payload = match payload_result { Ok(payload) => payload, Err(error) => { tracing::warn!( event_kind = %decoded_event.event_kind, pool_account = ?decoded_event.pool_account, decoded_event_id = ?decoded_event.id, error = %error, "skipping decoded event with invalid payload_json" ); continue; }, }; let payload_base_vault_address = crate::trade_aggregation::extract_payload_string_by_candidate_keys( &payload, &["baseVault", "base_vault", "baseVaultAddress", "base_vault_address"], ); let payload_quote_vault_address = crate::trade_aggregation::extract_payload_string_by_candidate_keys( &payload, &["quoteVault", "quote_vault", "quoteVaultAddress", "quote_vault_address"], ); let effective_base_vault_address = match base_vault_address.as_deref() { Some(base_vault_address) => Some(base_vault_address), None => payload_base_vault_address.as_deref(), }; let effective_quote_vault_address = match quote_vault_address.as_deref() { Some(quote_vault_address) => Some(quote_vault_address), None => payload_quote_vault_address.as_deref(), }; if !crate::is_decoded_event_trade_candidate(decoded_event.event_kind.as_str(), &payload) { tracing::debug!( event_kind = %decoded_event.event_kind, pool_account = ?decoded_event.pool_account, decoded_event_id = ?decoded_event.id, "skipping non-trade decoded event" ); continue; } if !crate::is_decoded_event_candle_candidate( decoded_event.event_kind.as_str(), &payload, ) { tracing::debug!( event_kind = %decoded_event.event_kind, pool_account = ?decoded_event.pool_account, decoded_event_id = ?decoded_event.id, "skipping non-candle decoded trade candidate" ); continue; } let trade_side = crate::trade_side_resolution::extract_trade_side( decoded_event.event_kind.as_str(), &payload, ); let amount_input = crate::trade_amount_resolution::TradeAmountResolutionInput { database: self.database.as_ref(), transaction: &transaction, decoded_event, payload: &payload, pool_address: pool_address.as_str(), base_token_mint: base_token_mint.as_deref(), quote_token_mint: quote_token_mint.as_deref(), base_token_decimals, quote_token_decimals, base_vault_address: effective_base_vault_address, quote_vault_address: effective_quote_vault_address, }; let amount_resolution = crate::trade_amount_resolution::resolve_trade_amounts(&amount_input).await; let amount_resolution = match amount_resolution { Ok(amount_resolution) => amount_resolution, Err(error) => return Err(error), }; let trade_side = match amount_resolution.resolved_trade_side { Some(resolved_trade_side) => resolved_trade_side, None => trade_side, }; let base_amount_raw = amount_resolution.base_amount_raw.clone(); let quote_amount_raw = amount_resolution.quote_amount_raw.clone(); let price_quote_per_base = amount_resolution.price_quote_per_base; if !crate::trade_metric_update::is_priced_trade_event( base_amount_raw.as_deref(), quote_amount_raw.as_deref(), price_quote_per_base, ) { tracing::debug!( event_kind = %decoded_event.event_kind, pool_account = ?decoded_event.pool_account, decoded_event_id = ?decoded_event.id, transaction_signature = %transaction.signature, base_amount_raw = ?base_amount_raw, quote_amount_raw = ?quote_amount_raw, price_quote_per_base = ?price_quote_per_base, "skipping unpriced trade aggregation candidate" ); continue; } let materialization_input = crate::trade_event_materialization::TradeEventMaterializationInput { database: self.database.as_ref(), persistence: &self.persistence, transaction: &transaction, transaction_id, decoded_event, decoded_event_id, existing_trade_event: existing_trade_option, pool: &pool, pool_id, pair: &pair, pair_id, trade_side, amount_resolution: &amount_resolution, }; let materialization_result = crate::trade_event_materialization::materialize_trade_event(materialization_input) .await; let materialization_result = match materialization_result { Ok(materialization_result) => materialization_result, Err(error) => return Err(error), }; results.push(materialization_result); } return Ok(results); } } fn extract_payload_string_by_candidate_keys( payload: &serde_json::Value, keys: &[&str], ) -> std::option::Option { let object = match payload.as_object() { Some(object) => object, None => return None, }; for key in keys { let value = object.get(*key); let value = match value { Some(value) => value, None => continue, }; if let Some(text) = value.as_str() { if !text.trim().is_empty() { return Some(text.to_string()); } } } return None; } fn should_skip_pump_fun_duplicate_trade_event( decoded_event: &crate::DexDecodedEventDto, decoded_events: &[crate::DexDecodedEventDto], ) -> bool { if decoded_event.event_kind.as_str() != "pump_fun.trade_event" { return false; } let trade_instruction_id = pump_fun_payload_instruction_id(decoded_event.payload_json.as_str()); for sibling in decoded_events { if sibling.id == decoded_event.id { continue; } if !is_direct_materialized_pump_fun_instruction_trade_kind(sibling.event_kind.as_str()) { continue; } let sibling_instruction_id = pump_fun_payload_instruction_id(sibling.payload_json.as_str()); if trade_instruction_id.is_some() && sibling_instruction_id.is_some() && trade_instruction_id != sibling_instruction_id { continue; } return true; } return false; } fn is_direct_materialized_pump_fun_instruction_trade_kind(event_kind: &str) -> bool { match event_kind { "pump_fun.buy" => return true, "pump_fun.sell" => return true, "pump_fun.buy_exact_sol_in" => return true, _ => return false, } } fn pump_fun_payload_instruction_id(payload_json: &str) -> std::option::Option { let parsed_result = serde_json::from_str::(payload_json); let parsed = match parsed_result { Ok(parsed) => parsed, Err(_) => return None, }; let object = match parsed.as_object() { Some(object) => object, None => return None, }; let value = match object.get("instructionId") { Some(value) => value, None => return None, }; if let Some(number) = value.as_i64() { return Some(number); } if let Some(text) = value.as_str() { let parsed_number = text.parse::(); match parsed_number { Ok(parsed_number) => return Some(parsed_number), Err(_) => return None, } } return None; } fn transaction_has_effective_error(transaction: &crate::ChainTransactionDto) -> bool { let err_json = match transaction.err_json.as_ref() { Some(err_json) => err_json.trim(), None => return false, }; if err_json.is_empty() { return false; } if err_json == "null" { return false; } return true; } #[cfg(test)] mod tests { async fn make_database() -> std::sync::Arc { let tempdir_result = tempfile::tempdir(); let tempdir = match tempdir_result { Ok(tempdir) => tempdir, Err(error) => panic!("tempdir must succeed: {}", error), }; let database_path = tempdir.path().join("trade_aggregation.sqlite3"); let config = crate::DatabaseConfig { enabled: true, backend: crate::DatabaseBackend::Sqlite, sqlite: crate::SqliteDatabaseConfig { path: database_path.to_string_lossy().to_string(), create_if_missing: true, busy_timeout_ms: 5000, max_connections: 1, auto_initialize_schema: true, use_wal: true, }, }; let database_result = crate::Database::connect_and_initialize(&config).await; let database = match database_result { Ok(database) => database, Err(error) => panic!("database init must succeed: {}", error), }; return std::sync::Arc::new(database); } async fn seed_fluxbeam_swap_transaction_with_err( database: std::sync::Arc, signature: &str, base_amount_raw: &str, quote_amount_raw: &str, meta_err: serde_json::Value, ) { let transaction_model = crate::TransactionModelService::new(database.clone()); let dex_decode = crate::DexDecodeService::new(database.clone()); let dex_detect = crate::DexDetectService::new(database); let resolved_transaction = serde_json::json!({ "slot": 940001, "blockTime": 1779400001, "version": 0, "transaction": { "message": { "instructions": [ { "programId": crate::FLUXBEAM_PROGRAM_ID, "program": "fluxbeam", "stackHeight": 1, "accounts": [ "TradeAggPool111", "TradeAggLpMint111", "TradeAggTokenA111", crate::WSOL_MINT_ID ], "parsed": { "info": { "instruction": "swap", "pool": "TradeAggPool111", "tokenA": "TradeAggTokenA111", "tokenB": crate::WSOL_MINT_ID, "baseAmountRaw": base_amount_raw, "quoteAmountRaw": quote_amount_raw } }, "data": "opaque" } ] } }, "meta": { "err": meta_err, "logMessages": [ "Program log: Instruction: Swap", "Program log: buy" ] } }); let project_result = transaction_model .persist_resolved_transaction( signature, Some("helius_primary_http".to_string()), &resolved_transaction, ) .await; if let Err(error) = project_result { panic!("projection must succeed: {}", error); } let decode_result = dex_decode.decode_transaction_by_signature(signature).await; if let Err(error) = decode_result { panic!("dex decode must succeed: {}", error); } let detect_result = dex_detect.detect_transaction_by_signature(signature).await; if let Err(error) = detect_result { panic!("dex detect must succeed: {}", error); } } async fn seed_fluxbeam_swap_transaction( database: std::sync::Arc, signature: &str, base_amount_raw: &str, quote_amount_raw: &str, ) { seed_fluxbeam_swap_transaction_with_err( database, signature, base_amount_raw, quote_amount_raw, serde_json::Value::Null, ) .await; } async fn seed_failed_fluxbeam_swap_transaction( database: std::sync::Arc, signature: &str, base_amount_raw: &str, quote_amount_raw: &str, ) { seed_fluxbeam_swap_transaction_with_err( database, signature, base_amount_raw, quote_amount_raw, serde_json::json!({ "InstructionError": [0, { "Custom": 1 }] }), ) .await; } #[tokio::test] async fn record_transaction_by_signature_creates_trade_event_and_pair_metric() { let database = make_database().await; seed_fluxbeam_swap_transaction(database.clone(), "sig-trade-aggregation-1", "1000", "2500") .await; let service = crate::TradeAggregationService::new(database.clone()); let record_result = service.record_transaction_by_signature("sig-trade-aggregation-1").await; let results = match record_result { Ok(results) => results, Err(error) => panic!("trade aggregation must succeed: {}", error), }; assert_eq!(results.len(), 1); assert!(results[0].created_trade_event); let trade_events_result = crate::query_trade_events_list_by_pair_id(database.as_ref(), results[0].pair_id).await; let trade_events = match trade_events_result { Ok(trade_events) => trade_events, Err(error) => panic!("trade event list must succeed: {}", error), }; assert_eq!(trade_events.len(), 1); assert_eq!(trade_events[0].trade_side, crate::SwapTradeSide::BuyBase); assert_eq!(trade_events[0].base_amount_raw, Some("1000".to_string())); assert_eq!(trade_events[0].quote_amount_raw, Some("2500".to_string())); assert_eq!(trade_events[0].price_quote_per_base, Some(2.5)); let pair_metric_result = crate::query_pair_metrics_get_by_pair_id(database.as_ref(), results[0].pair_id).await; let pair_metric_option = match pair_metric_result { Ok(pair_metric_option) => pair_metric_option, Err(error) => panic!("pair metric fetch must succeed: {}", error), }; let pair_metric = match pair_metric_option { Some(pair_metric) => pair_metric, None => panic!("pair metric must exist"), }; assert_eq!(pair_metric.trade_count, 1); assert_eq!(pair_metric.buy_count, 1); assert_eq!(pair_metric.sell_count, 0); assert_eq!(pair_metric.cumulative_base_amount_raw, Some("1000".to_string())); assert_eq!(pair_metric.cumulative_quote_amount_raw, Some("2500".to_string())); assert_eq!(pair_metric.last_price_quote_per_base, Some(2.5)); } #[tokio::test] async fn record_transaction_by_signature_is_idempotent() { let database = make_database().await; seed_fluxbeam_swap_transaction(database.clone(), "sig-trade-aggregation-2", "1000", "2500") .await; let service = crate::TradeAggregationService::new(database.clone()); let first_result = service.record_transaction_by_signature("sig-trade-aggregation-2").await; let first_results = match first_result { Ok(first_results) => first_results, Err(error) => panic!("first trade aggregation must succeed: {}", error), }; assert_eq!(first_results.len(), 1); assert!(first_results[0].created_trade_event); let second_result = service.record_transaction_by_signature("sig-trade-aggregation-2").await; let second_results = match second_result { Ok(second_results) => second_results, Err(error) => panic!("second trade aggregation must succeed: {}", error), }; assert_eq!(second_results.len(), 1); assert!(!second_results[0].created_trade_event); let trade_events_result = crate::query_trade_events_list_by_pair_id(database.as_ref(), first_results[0].pair_id) .await; let trade_events = match trade_events_result { Ok(trade_events) => trade_events, Err(error) => panic!("trade event list must succeed: {}", error), }; assert_eq!(trade_events.len(), 1); let pair_metric_result = crate::query_pair_metrics_get_by_pair_id(database.as_ref(), first_results[0].pair_id) .await; let pair_metric_option = match pair_metric_result { Ok(pair_metric_option) => pair_metric_option, Err(error) => panic!("pair metric fetch must succeed: {}", error), }; let pair_metric = match pair_metric_option { Some(pair_metric) => pair_metric, None => panic!("pair metric must exist"), }; assert_eq!(pair_metric.trade_count, 1); } #[test] fn transaction_null_err_json_is_not_effective_error() { let transaction = crate::ChainTransactionDto::new( "sig-null-err".to_string(), Some(1), None, Some("test".to_string()), Some("0".to_string()), Some("null".to_string()), None, serde_json::json!({"meta":{"err":null}}).to_string(), ); assert!(!super::transaction_has_effective_error(&transaction)); } #[test] fn transaction_empty_err_json_is_not_effective_error() { let transaction = crate::ChainTransactionDto::new( "sig-empty-err".to_string(), Some(1), None, Some("test".to_string()), Some("0".to_string()), Some("".to_string()), None, serde_json::json!({"meta":{"err":null}}).to_string(), ); assert!(!super::transaction_has_effective_error(&transaction)); } #[test] fn transaction_non_null_err_json_is_effective_error() { let transaction = crate::ChainTransactionDto::new( "sig-real-err".to_string(), Some(1), None, Some("test".to_string()), Some("0".to_string()), Some(serde_json::json!({"InstructionError":[0,{"Custom":1}]}).to_string()), None, serde_json::json!({"meta":{"err":{"InstructionError":[0,{"Custom":1}]}}}).to_string(), ); assert!(super::transaction_has_effective_error(&transaction)); } #[tokio::test] async fn record_transaction_by_signature_skips_failed_transaction() { let database = make_database().await; seed_failed_fluxbeam_swap_transaction( database.clone(), "sig-trade-aggregation-failed-1", "1000", "2500", ) .await; let transaction_result = crate::query_chain_transactions_get_by_signature( database.as_ref(), "sig-trade-aggregation-failed-1", ) .await; let transaction_option = match transaction_result { Ok(transaction_option) => transaction_option, Err(error) => panic!("transaction fetch must succeed: {}", error), }; let transaction = match transaction_option { Some(transaction) => transaction, None => panic!("transaction must exist"), }; let transaction_id = match transaction.id { Some(transaction_id) => transaction_id, None => panic!("transaction id must exist"), }; let service = crate::TradeAggregationService::new(database.clone()); let record_result = service.record_transaction_by_signature("sig-trade-aggregation-failed-1").await; let results = match record_result { Ok(results) => results, Err(error) => panic!("failed transaction aggregation must not fail: {}", error), }; assert_eq!(results.len(), 0); let trade_events_result = crate::query_trade_events_list_by_transaction_id(database.as_ref(), transaction_id) .await; let trade_events = match trade_events_result { Ok(trade_events) => trade_events, Err(error) => panic!("trade event list must succeed: {}", error), }; assert_eq!(trade_events.len(), 0); } }