// file: kb_lib/src/non_trade_event_materialization.rs //! Materialization of useful non-trade DEX events. //! //! This service persists liquidity, pool lifecycle, fee, reward and pool //! administration events from already decoded DEX events. It deliberately does //! not feed trade, metric or candle materialization. /// Result of non-trade event materialization for one transaction. #[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] pub struct NonTradeEventMaterializationResult { /// Number of liquidity events inserted or refreshed. pub liquidity_event_count: usize, /// Number of pool lifecycle events inserted or refreshed. pub pool_lifecycle_event_count: usize, /// Number of fee events inserted or refreshed. pub fee_event_count: usize, /// Number of reward events inserted or refreshed. pub reward_event_count: usize, /// Number of pool administration events inserted or refreshed. pub pool_admin_event_count: usize, } /// Materializes useful non-trade decoded DEX events. #[derive(Debug, Clone)] pub struct NonTradeEventMaterializationService { database: std::sync::Arc, } struct NonTradeDecodedEventContext { dex_id: std::option::Option, pool_id: std::option::Option, pair_id: std::option::Option, pair: std::option::Option, } impl NonTradeEventMaterializationService { /// Creates a new non-trade event materialization service. pub fn new(database: std::sync::Arc) -> Self { return Self { database }; } /// Materializes useful non-trade events for one persisted transaction signature. pub async fn record_transaction_by_signature( &self, signature: &str, ) -> Result { let transaction_result = crate::query_chain_transactions_get_by_signature(self.database.as_ref(), signature) .await; let transaction_option = match transaction_result { Ok(transaction_option) => transaction_option, Err(error) => return Err(error), }; let transaction = match transaction_option { Some(transaction) => transaction, None => { return Err(crate::Error::InvalidState(format!( "cannot materialize non-trade events for unknown transaction '{}'", signature ))); }, }; if transaction.err_json.is_some() { tracing::debug!( signature = %transaction.signature, "skipping non-trade materialization for failed transaction" ); return Ok(crate::NonTradeEventMaterializationResult::default()); } let transaction_id = match transaction.id { Some(transaction_id) => transaction_id, None => { return Err(crate::Error::InvalidState(format!( "transaction '{}' has no internal id", transaction.signature ))); }, }; let decoded_events_result = crate::query_dex_decoded_events_list_by_transaction_id( self.database.as_ref(), transaction_id, ) .await; let decoded_events = match decoded_events_result { Ok(decoded_events) => decoded_events, Err(error) => return Err(error), }; let mut result = crate::NonTradeEventMaterializationResult::default(); for decoded_event in &decoded_events { let payload_result = serde_json::from_str::(decoded_event.payload_json.as_str()); let payload = match payload_result { Ok(payload) => payload, Err(error) => { tracing::warn!( signature = %transaction.signature, event_kind = %decoded_event.event_kind, error = %error, "skipping non-trade materialization for invalid decoded payload" ); continue; }, }; if crate::is_dex_liquidity_event_kind(decoded_event.event_kind.as_str()) { let materialized = self .materialize_liquidity_event( &transaction, transaction_id, decoded_event, &payload, ) .await; match materialized { Ok(was_materialized) => { if was_materialized { result.liquidity_event_count += 1; } }, Err(error) => return Err(error), } } if crate::is_dex_pool_lifecycle_event_kind(decoded_event.event_kind.as_str()) { let materialized = self .materialize_pool_lifecycle_event(&transaction, transaction_id, decoded_event) .await; match materialized { Ok(was_materialized) => { if was_materialized { result.pool_lifecycle_event_count += 1; } }, Err(error) => return Err(error), } } if crate::is_dex_fee_event_kind(decoded_event.event_kind.as_str()) { let materialized = self .materialize_fee_event(&transaction, transaction_id, decoded_event, &payload) .await; match materialized { Ok(was_materialized) => { if was_materialized { result.fee_event_count += 1; } }, Err(error) => return Err(error), } } if crate::is_dex_reward_event_kind(decoded_event.event_kind.as_str()) { let materialized = self .materialize_reward_event(&transaction, transaction_id, decoded_event, &payload) .await; match materialized { Ok(was_materialized) => { if was_materialized { result.reward_event_count += 1; } }, Err(error) => return Err(error), } } if crate::is_dex_admin_event_kind(decoded_event.event_kind.as_str()) { let materialized = self .materialize_pool_admin_event( &transaction, transaction_id, decoded_event, &payload, ) .await; match materialized { Ok(was_materialized) => { if was_materialized { result.pool_admin_event_count += 1; } }, Err(error) => return Err(error), } } } return Ok(result); } async fn materialize_pool_lifecycle_event( &self, transaction: &crate::ChainTransactionDto, transaction_id: i64, decoded_event: &crate::DexDecodedEventDto, ) -> Result { let decoded_event_id = match decoded_event.id { Some(decoded_event_id) => decoded_event_id, None => return Ok(false), }; let context = self.resolve_decoded_event_context(decoded_event).await; let context = match context { Ok(context) => context, Err(error) => return Err(error), }; let dto = crate::PoolLifecycleEventDto::new( transaction_id, Some(decoded_event_id), context.dex_id, context.pool_id, context.pair_id, transaction.signature.clone(), transaction.slot, decoded_event.protocol_name.clone(), decoded_event.program_id.clone(), decoded_event.event_kind.clone(), decoded_event.pool_account.clone(), decoded_event.token_a_mint.clone(), decoded_event.token_b_mint.clone(), decoded_event.payload_json.clone(), ); let upsert_result = crate::query_pool_lifecycle_events_upsert(self.database.as_ref(), &dto).await; match upsert_result { Ok(_) => return Ok(true), Err(error) => return Err(error), } } async fn materialize_fee_event( &self, transaction: &crate::ChainTransactionDto, transaction_id: i64, decoded_event: &crate::DexDecodedEventDto, payload: &serde_json::Value, ) -> Result { let decoded_event_id = match decoded_event.id { Some(decoded_event_id) => decoded_event_id, None => return Ok(false), }; let context = self.resolve_decoded_event_context(decoded_event).await; let context = match context { Ok(context) => context, Err(error) => return Err(error), }; let actor_wallet = extract_first_string( payload, &[ "actorWallet", "actor_wallet", "receiver", "recipient", "owner", "payer", "authority", "user", ], ); let fee_token_mint = extract_first_string( payload, &[ "feeTokenMint", "fee_token_mint", "tokenMint", "token_mint", "mint", "quoteMint", "quote_mint", ], ); let fee_amount_raw = extract_first_amount_string( payload, &[ "feeAmountRaw", "fee_amount_raw", "feeAmount", "fee_amount", "protocolFeeAmount", "protocol_fee_amount", "fundFeeAmount", "fund_fee_amount", "creatorFeeAmount", "creator_fee_amount", "amount", ], ); let dto = crate::FeeEventDto::new( transaction_id, Some(decoded_event_id), context.dex_id, context.pool_id, context.pair_id, transaction.signature.clone(), transaction.slot, decoded_event.protocol_name.clone(), decoded_event.program_id.clone(), decoded_event.event_kind.clone(), decoded_event.pool_account.clone(), actor_wallet, fee_token_mint, fee_amount_raw, decoded_event.payload_json.clone(), ); let upsert_result = crate::query_fee_events_upsert(self.database.as_ref(), &dto).await; match upsert_result { Ok(_) => return Ok(true), Err(error) => return Err(error), } } async fn materialize_reward_event( &self, transaction: &crate::ChainTransactionDto, transaction_id: i64, decoded_event: &crate::DexDecodedEventDto, payload: &serde_json::Value, ) -> Result { let decoded_event_id = match decoded_event.id { Some(decoded_event_id) => decoded_event_id, None => return Ok(false), }; let context = self.resolve_decoded_event_context(decoded_event).await; let context = match context { Ok(context) => context, Err(error) => return Err(error), }; let actor_wallet = extract_first_string( payload, &[ "actorWallet", "actor_wallet", "receiver", "recipient", "owner", "payer", "authority", "user", ], ); let reward_token_mint = extract_first_string( payload, &["rewardTokenMint", "reward_token_mint", "tokenMint", "token_mint", "mint"], ); let reward_amount_raw = extract_first_amount_string( payload, &[ "rewardAmountRaw", "reward_amount_raw", "rewardAmount", "reward_amount", "emissionAmount", "emission_amount", "amount", ], ); let dto = crate::RewardEventDto::new( transaction_id, Some(decoded_event_id), context.dex_id, context.pool_id, context.pair_id, transaction.signature.clone(), transaction.slot, decoded_event.protocol_name.clone(), decoded_event.program_id.clone(), decoded_event.event_kind.clone(), decoded_event.pool_account.clone(), actor_wallet, reward_token_mint, reward_amount_raw, decoded_event.payload_json.clone(), ); let upsert_result = crate::query_reward_events_upsert(self.database.as_ref(), &dto).await; match upsert_result { Ok(_) => return Ok(true), Err(error) => return Err(error), } } async fn materialize_pool_admin_event( &self, transaction: &crate::ChainTransactionDto, transaction_id: i64, decoded_event: &crate::DexDecodedEventDto, payload: &serde_json::Value, ) -> Result { let decoded_event_id = match decoded_event.id { Some(decoded_event_id) => decoded_event_id, None => return Ok(false), }; let context = self.resolve_decoded_event_context(decoded_event).await; let context = match context { Ok(context) => context, Err(error) => return Err(error), }; let actor_wallet = extract_first_string( payload, &["actorWallet", "actor_wallet", "authority", "admin", "owner", "payer", "user"], ); let admin_action = match extract_first_string( payload, &["adminAction", "admin_action", "action", "configAction", "config_action"], ) { Some(admin_action) => Some(admin_action), None => Some(decoded_event.event_kind.clone()), }; let dto = crate::PoolAdminEventDto::new( transaction_id, Some(decoded_event_id), context.dex_id, context.pool_id, context.pair_id, transaction.signature.clone(), transaction.slot, decoded_event.protocol_name.clone(), decoded_event.program_id.clone(), decoded_event.event_kind.clone(), decoded_event.pool_account.clone(), actor_wallet, admin_action, decoded_event.payload_json.clone(), ); let upsert_result = crate::query_pool_admin_events_upsert(self.database.as_ref(), &dto).await; match upsert_result { Ok(_) => return Ok(true), Err(error) => return Err(error), } } async fn materialize_liquidity_event( &self, transaction: &crate::ChainTransactionDto, transaction_id: i64, decoded_event: &crate::DexDecodedEventDto, payload: &serde_json::Value, ) -> Result { let decoded_event_id = match decoded_event.id { Some(decoded_event_id) => decoded_event_id, None => return Ok(false), }; let context = self.resolve_decoded_event_context(decoded_event).await; let context = match context { Ok(context) => context, Err(error) => return Err(error), }; let dex_id = match context.dex_id { Some(dex_id) => dex_id, None => return Ok(false), }; let pool_id = match context.pool_id { Some(pool_id) => pool_id, None => return Ok(false), }; let pair = match context.pair { Some(pair) => pair, None => return Ok(false), }; let pair_id = match pair.id { Some(pair_id) => Some(pair_id), None => None, }; let event_kind = if crate::is_dex_position_open_event_kind(decoded_event.event_kind.as_str()) { crate::LiquidityEventKind::PositionOpen } else if crate::is_dex_position_close_event_kind(decoded_event.event_kind.as_str()) { crate::LiquidityEventKind::PositionClose } else if crate::is_dex_liquidity_remove_event_kind(decoded_event.event_kind.as_str()) { crate::LiquidityEventKind::Remove } else { crate::LiquidityEventKind::Add }; let actor_wallet = extract_first_string( payload, &[ "actorWallet", "actor_wallet", "user", "owner", "payer", "authority", "liquidityProvider", "liquidity_provider", ], ); let base_amount = extract_first_amount_string( payload, &[ "baseAmountRaw", "base_amount_raw", "baseAmount", "base_amount", "amountBase", "amount_base", "tokenAAmount", "token_a_amount", "amountA", "amount_a", ], ); let quote_amount = extract_first_amount_string( payload, &[ "quoteAmountRaw", "quote_amount_raw", "quoteAmount", "quote_amount", "amountQuote", "amount_quote", "tokenBAmount", "token_b_amount", "amountB", "amount_b", ], ); let lp_amount = extract_first_amount_string( payload, &[ "lpAmountRaw", "lp_amount_raw", "lpAmount", "lp_amount", "liquidity", "liquidityAmount", "liquidity_amount", ], ); let amounts_are_complete = base_amount.is_some() && quote_amount.is_some(); let base_amount_value = match base_amount { Some(base_amount_value) => base_amount_value, None => "0".to_string(), }; let quote_amount_value = match quote_amount { Some(quote_amount_value) => quote_amount_value, None => "0".to_string(), }; let dto = crate::LiquidityEventDto::new( dex_id, pool_id, pair_id, transaction.signature.clone(), decoded_event_id, transaction.slot, event_kind, actor_wallet, pair.base_token_id, pair.quote_token_id, None, base_amount_value, quote_amount_value, lp_amount, ) .with_decoded_event_metadata( Some(transaction_id), Some(decoded_event_id), Some(decoded_event.program_id.clone()), Some(decoded_event.event_kind.clone()), Some(decoded_event.payload_json.clone()), amounts_are_complete, ); let upsert_result = crate::query_liquidity_events_upsert(self.database.as_ref(), &dto).await; match upsert_result { Ok(_) => return Ok(true), Err(error) => return Err(error), } } async fn resolve_decoded_event_context( &self, decoded_event: &crate::DexDecodedEventDto, ) -> Result { let dex_result = crate::query_dexs_get_by_code( self.database.as_ref(), decoded_event.protocol_name.as_str(), ) .await; let dex_id = match dex_result { Ok(Some(dex)) => dex.id, Ok(None) => None, Err(error) => return Err(error), }; let pool_address = match decoded_event.pool_account.clone() { Some(pool_address) => pool_address, None => { return Ok(NonTradeDecodedEventContext { dex_id, pool_id: None, pair_id: None, pair: None, }); }, }; let pool_result = crate::query_pools_get_by_address(self.database.as_ref(), pool_address.as_str()).await; let pool = match pool_result { Ok(Some(pool)) => pool, Ok(None) => { return Ok(NonTradeDecodedEventContext { dex_id, pool_id: None, pair_id: None, pair: None, }); }, Err(error) => return Err(error), }; let pool_id = match pool.id { Some(pool_id) => pool_id, None => { return Ok(NonTradeDecodedEventContext { dex_id, pool_id: None, pair_id: None, pair: None, }); }, }; let pair_result = crate::query_pairs_get_by_pool_id(self.database.as_ref(), pool_id).await; let pair = match pair_result { Ok(pair) => pair, Err(error) => return Err(error), }; let pair_id = match pair.as_ref() { Some(pair) => pair.id, None => None, }; return Ok(NonTradeDecodedEventContext { dex_id, pool_id: Some(pool_id), pair_id, pair, }); } } fn extract_first_amount_string( value: &serde_json::Value, candidate_keys: &[&str], ) -> std::option::Option { let text = extract_first_string(value, candidate_keys); if text.is_some() { return text; } return extract_first_number_as_string(value, candidate_keys); } fn extract_first_string( value: &serde_json::Value, candidate_keys: &[&str], ) -> std::option::Option { if let Some(object) = value.as_object() { for candidate_key in candidate_keys { let value_option = object.get(*candidate_key); let candidate = match value_option { Some(candidate) => candidate, None => continue, }; if let Some(text) = candidate.as_str() { let trimmed = text.trim(); if !trimmed.is_empty() { return Some(trimmed.to_string()); } } } for nested_value in object.values() { let nested = extract_first_string(nested_value, candidate_keys); if nested.is_some() { return nested; } } return None; } if let Some(array) = value.as_array() { for nested_value in array { let nested = extract_first_string(nested_value, candidate_keys); if nested.is_some() { return nested; } } } return None; } fn extract_first_number_as_string( value: &serde_json::Value, candidate_keys: &[&str], ) -> std::option::Option { if let Some(object) = value.as_object() { for candidate_key in candidate_keys { let value_option = object.get(*candidate_key); let candidate = match value_option { Some(candidate) => candidate, None => continue, }; if let Some(number) = candidate.as_i64() { return Some(number.to_string()); } if let Some(number) = candidate.as_u64() { return Some(number.to_string()); } if let Some(number) = candidate.as_f64() { return Some(number.to_string()); } } for nested_value in object.values() { let nested = extract_first_number_as_string(nested_value, candidate_keys); if nested.is_some() { return nested; } } return None; } if let Some(array) = value.as_array() { for nested_value in array { let nested = extract_first_number_as_string(nested_value, candidate_keys); if nested.is_some() { return nested; } } } return None; } #[cfg(test)] mod tests { #[test] fn extracts_nested_liquidity_amounts() { let payload = serde_json::json!({ "event": { "baseAmountRaw": "100", "quoteAmountRaw": 25, "owner": "Owner111111111111111111111111111111111111" } }); assert_eq!( super::extract_first_amount_string(&payload, &["baseAmountRaw"]), Some("100".to_string()) ); assert_eq!( super::extract_first_amount_string(&payload, &["quoteAmountRaw"]), Some("25".to_string()) ); assert_eq!( super::extract_first_string(&payload, &["owner"]), Some("Owner111111111111111111111111111111111111".to_string()) ); } }