// file: kb_lib/src/dex_event_classification.rs //! Shared DEX event classification and decoded-payload enrichment. //! //! This module contains deterministic helpers used by DEX decoding, //! trade aggregation and future non-trade event materialization. //! //! It intentionally does not decode protocol instructions and does not //! perform database access. /// Stable business category assigned to one decoded DEX event kind. #[derive(Debug, Copy, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] pub enum DexEventCategory { /// Swap-like event that can potentially become a normalized trade. Trade, /// Liquidity deposit, withdraw, position open or position close event. Liquidity, /// Fee collection event. Fee, /// Reward or emission event. Reward, /// Pool creation, initialization or migration event. PoolLifecycle, /// Protocol administration, configuration or permission update event. Admin, /// Event kind that is not classified yet. Unknown, } impl DexEventCategory { /// Returns the stable string code persisted inside decoded payload metadata. pub fn as_str(self) -> &'static str { match self { Self::Trade => return "trade", Self::Liquidity => return "liquidity", Self::Fee => return "fee", Self::Reward => return "reward", Self::PoolLifecycle => return "pool_lifecycle", Self::Admin => return "admin", Self::Unknown => return "unknown", } } } /// Classifies a DEX event kind into a stable business category. pub fn classify_dex_event_category(event_kind: &str) -> DexEventCategory { if is_dex_reward_event_kind(event_kind) { return DexEventCategory::Reward; } if is_dex_fee_event_kind(event_kind) { return DexEventCategory::Fee; } if is_dex_liquidity_event_kind(event_kind) { return DexEventCategory::Liquidity; } if is_dex_pool_lifecycle_event_kind(event_kind) { return DexEventCategory::PoolLifecycle; } if is_dex_admin_event_kind(event_kind) { return DexEventCategory::Admin; } if is_dex_trade_event_kind(event_kind) { return DexEventCategory::Trade; } return DexEventCategory::Unknown; } /// Classifies a DEX event kind and returns the persisted category code. pub fn classify_dex_event_category_code(event_kind: &str) -> &'static str { return classify_dex_event_category(event_kind).as_str(); } /// Returns true when the event kind represents a swap-like event. pub fn is_dex_trade_event_kind(event_kind: &str) -> bool { if event_kind.ends_with(".buy") { return true; } if event_kind.ends_with(".sell") { return true; } if event_kind.ends_with(".swap") { return true; } if event_kind.contains(".swap_") { return true; } if event_kind.ends_with(".exact_input") { return true; } if event_kind.ends_with(".exact_output") { return true; } return false; } /// Returns true when the event kind can directly produce a candle candidate. pub fn is_dex_candle_candidate_event_kind(event_kind: &str) -> bool { if event_kind.contains("router") { return false; } if event_kind.contains("route") { return false; } return is_dex_trade_event_kind(event_kind); } /// Returns true for liquidity lifecycle changes that must not become candles. pub fn is_dex_liquidity_event_kind(event_kind: &str) -> bool { if event_kind.contains(".deposit") { return true; } if event_kind.contains(".withdraw") { return true; } if event_kind.contains(".increase_liquidity") { return true; } if event_kind.contains(".decrease_liquidity") { return true; } if event_kind.contains(".open_position") { return true; } if event_kind.contains(".close_position") { return true; } return false; } /// Returns true for fee collection events. pub fn is_dex_fee_event_kind(event_kind: &str) -> bool { if event_kind.contains("collect_creator_fee") { return true; } if event_kind.contains("collect_protocol_fee") { return true; } if event_kind.contains("collect_fund_fee") { return true; } if event_kind.contains("collect_fee") { return true; } return false; } /// Returns true for reward or incentive events. pub fn is_dex_reward_event_kind(event_kind: &str) -> bool { if event_kind.contains("reward") { return true; } if event_kind.contains("emission") { return true; } return false; } /// Returns true for pool creation, initialization or migration events. pub fn is_dex_pool_lifecycle_event_kind(event_kind: &str) -> bool { if event_kind.contains(".initialize") { return true; } if event_kind.contains(".initialize_with_permission") { return true; } if event_kind.contains(".create_pool") { return true; } if event_kind.contains(".create_v2_token") { return true; } if event_kind.contains(".migrate") { return true; } return false; } /// Returns true for admin, configuration or permission changes. pub fn is_dex_admin_event_kind(event_kind: &str) -> bool { if event_kind.contains("admin") { return true; } if event_kind.contains("config") { return true; } if event_kind.contains("permission") { return true; } if event_kind.contains("set_") { return true; } if event_kind.contains("update_") { return true; } return false; } /// Returns true when a decoded payload contains at least one direct amount or price field. /// /// This is a conservative payload-level check. It does not inspect transaction /// token balance deltas and is intended for protocol decoders that cannot yet /// produce a deterministic materializable swap payload. pub(crate) fn decoded_payload_has_trade_amount_or_price_payload( payload: &serde_json::Value, ) -> bool { return value_contains_any_non_null_key( payload, &[ "baseAmountRaw", "base_amount_raw", "baseAmount", "amountBase", "amountInBase", "quoteAmountRaw", "quote_amount_raw", "quoteAmount", "amountQuote", "amountOutQuote", "amountIn", "amountOut", "priceQuotePerBase", "price_quote_per_base", "quotePerBase", "lastPriceQuotePerBase", ], ); } /// Returns true when a decoded payload is marked as a trade candidate. /// /// Explicit payload metadata wins over event-kind inference. This allows /// incomplete decoded events, such as incomplete PumpSwap trades, to opt out. pub fn is_decoded_event_trade_candidate(event_kind: &str, payload: &serde_json::Value) -> bool { let trade_candidate_option = extract_top_level_bool_by_candidate_keys(payload, &["tradeCandidate", "trade_candidate"]); if let Some(trade_candidate) = trade_candidate_option { return trade_candidate; } let event_category_option = extract_string_by_candidate_keys(payload, &["eventCategory", "event_category"]); if let Some(event_category) = event_category_option { return event_category.as_str() == DexEventCategory::Trade.as_str(); } return is_dex_trade_event_kind(event_kind); } /// Returns true when a decoded payload can be materialized as a candle candidate. pub fn is_decoded_event_candle_candidate(event_kind: &str, payload: &serde_json::Value) -> bool { let candle_candidate_option = extract_top_level_bool_by_candidate_keys(payload, &["candleCandidate", "candle_candidate"]); if let Some(candle_candidate) = candle_candidate_option { return candle_candidate; } if !is_decoded_event_trade_candidate(event_kind, payload) { return false; } return is_dex_candle_candidate_event_kind(event_kind); } /// Enriches a decoded payload with non-destructive classification metadata. pub fn enrich_dex_decoded_payload( protocol_name: &str, event_kind: &str, payload_json: serde_json::Value, ) -> serde_json::Value { let event_category = classify_dex_event_category_code(event_kind); let trade_candidate = is_dex_trade_event_kind(event_kind); let candle_candidate = is_dex_candle_candidate_event_kind(event_kind); let mut object = match payload_json { serde_json::Value::Object(object) => object, other => { let mut object = serde_json::Map::new(); object.insert("rawPayload".to_owned(), other); object }, }; json_insert_string_if_missing(&mut object, "protocolName", protocol_name); json_insert_string_if_missing(&mut object, "eventKind", event_kind); json_insert_string_if_missing(&mut object, "eventCategory", event_category); json_insert_bool_if_missing(&mut object, "tradeCandidate", trade_candidate); json_insert_bool_if_missing(&mut object, "candleCandidate", candle_candidate); json_insert_i64_if_missing(&mut object, "eventClassificationVersion", 1); if !trade_candidate { json_insert_string_if_missing(&mut object, "skipTradeReason", "non_trade_event"); } else if !candle_candidate { json_insert_string_if_missing( &mut object, "skipCandleReason", "route_or_multihop_event_requires_leg_resolution", ); } return serde_json::Value::Object(object); } /// Enriches a decoded payload and serializes it as JSON. pub fn enrich_and_serialize_dex_decoded_payload( protocol_name: &str, event_kind: &str, payload_json: serde_json::Value, ) -> Result { let enriched_payload = enrich_dex_decoded_payload(protocol_name, event_kind, payload_json); let payload_json_result = serde_json::to_string(&enriched_payload); match payload_json_result { Ok(payload_json) => return Ok(payload_json), Err(error) => { return Err(crate::Error::Json(format!( "cannot serialize enriched decoded payload for '{}': {}", event_kind, error ))); }, } } /// Parses, enriches and serializes a decoded payload. pub fn enrich_serialized_dex_decoded_payload( protocol_name: &str, event_kind: &str, payload_json: &str, ) -> Result { let payload_value_result = serde_json::from_str::(payload_json); let payload_value = match payload_value_result { Ok(payload_value) => payload_value, Err(error) => { return Err(crate::Error::Json(format!( "cannot parse decoded payload for '{}': {}", event_kind, error ))); }, }; return enrich_and_serialize_dex_decoded_payload(protocol_name, event_kind, payload_value); } fn json_insert_string_if_missing( object: &mut serde_json::Map, key: &str, value: &str, ) { if object.contains_key(key) { return; } object.insert(key.to_owned(), serde_json::Value::String(value.to_owned())); } fn json_insert_bool_if_missing( object: &mut serde_json::Map, key: &str, value: bool, ) { if object.contains_key(key) { return; } object.insert(key.to_owned(), serde_json::Value::Bool(value)); } fn json_insert_i64_if_missing( object: &mut serde_json::Map, key: &str, value: i64, ) { if object.contains_key(key) { return; } object.insert(key.to_owned(), serde_json::Value::Number(serde_json::Number::from(value))); } fn value_contains_any_non_null_key(value: &serde_json::Value, candidate_keys: &[&str]) -> bool { if let Some(object) = value.as_object() { for candidate_key in candidate_keys { let candidate_value_option = object.get(*candidate_key); if let Some(candidate_value) = candidate_value_option { if !candidate_value.is_null() { if let Some(text) = candidate_value.as_str() { if text.trim().is_empty() { continue; } } return true; } } } for nested_value in object.values() { if value_contains_any_non_null_key(nested_value, candidate_keys) { return true; } } return false; } if let Some(array) = value.as_array() { for nested_value in array { if value_contains_any_non_null_key(nested_value, candidate_keys) { return true; } } } return false; } fn extract_top_level_bool_by_candidate_keys( payload: &serde_json::Value, candidate_keys: &[&str], ) -> std::option::Option { let object = match payload.as_object() { Some(object) => object, None => return None, }; for candidate_key in candidate_keys { let value_option = object.get(*candidate_key); let value = match value_option { Some(value) => value, None => continue, }; if let Some(value_bool) = value.as_bool() { return Some(value_bool); } if let Some(value_i64) = value.as_i64() { return Some(value_i64 != 0); } if let Some(value_u64) = value.as_u64() { return Some(value_u64 != 0); } if let Some(value_text) = value.as_str() { let normalized = value_text.trim().to_ascii_lowercase(); if normalized.as_str() == "true" { return Some(true); } if normalized.as_str() == "false" { return Some(false); } if normalized.as_str() == "1" { return Some(true); } if normalized.as_str() == "0" { return Some(false); } } } return None; } fn extract_string_by_candidate_keys( value: &serde_json::Value, candidate_keys: &[&str], ) -> std::option::Option { if let Some(object) = value.as_object() { for candidate_key in candidate_keys { let direct_option = object.get(*candidate_key); if let Some(direct) = direct_option { let direct_text_option = direct.as_str(); if let Some(direct_text) = direct_text_option { return Some(direct_text.to_string()); } } } for nested_value in object.values() { let nested_result = extract_string_by_candidate_keys(nested_value, candidate_keys); if nested_result.is_some() { return nested_result; } } return None; } if let Some(array) = value.as_array() { for nested_value in array { let nested_result = extract_string_by_candidate_keys(nested_value, candidate_keys); if nested_result.is_some() { return nested_result; } } } return None; } #[cfg(test)] mod tests { #[test] fn classifies_swap_events_as_trade_candidates() { assert_eq!( super::classify_dex_event_category_code("raydium_cpmm.swap_base_input"), "trade" ); assert_eq!( super::classify_dex_event_category_code("raydium_cpmm.swap_base_output"), "trade" ); assert_eq!(super::classify_dex_event_category_code("raydium_clmm.swap"), "trade"); assert_eq!(super::classify_dex_event_category_code("raydium_clmm.swap_v2"), "trade"); assert_eq!(super::classify_dex_event_category_code("raydium_clmm.exact_output"), "trade"); assert_eq!(super::classify_dex_event_category_code("pump_fun.buy"), "trade"); assert!(super::is_dex_trade_event_kind("raydium_cpmm.swap_base_input")); assert!(super::is_dex_candle_candidate_event_kind("raydium_cpmm.swap_base_input")); } #[test] fn classifies_router_swap_as_trade_but_not_direct_candle_candidate() { assert_eq!( super::classify_dex_event_category_code("raydium_clmm.swap_router_base_in"), "trade" ); assert!(super::is_dex_trade_event_kind("raydium_clmm.swap_router_base_in")); assert!(!super::is_dex_candle_candidate_event_kind("raydium_clmm.swap_router_base_in")); } #[test] fn classifies_fee_reward_liquidity_and_lifecycle_events() { assert_eq!( super::classify_dex_event_category_code("raydium_cpmm.collect_creator_fee"), "fee" ); assert_eq!( super::classify_dex_event_category_code("raydium_clmm.collect_protocol_fee"), "fee" ); assert_eq!( super::classify_dex_event_category_code("raydium_clmm.set_reward_params"), "reward" ); assert_eq!( super::classify_dex_event_category_code("raydium_clmm.increase_liquidity_v2"), "liquidity" ); assert_eq!( super::classify_dex_event_category_code("raydium_cpmm.initialize"), "pool_lifecycle" ); } #[test] fn enriched_payload_keeps_existing_fields() { let payload_json = serde_json::json!({ "eventCategory": "custom", "amountIn": "10" }); let enriched_payload = super::enrich_dex_decoded_payload( "raydium_cpmm", "raydium_cpmm.swap_base_input", payload_json, ); let object_option = enriched_payload.as_object(); let object = match object_option { Some(object) => object, None => { panic!("expected enriched payload object"); }, }; assert_eq!( object.get("eventCategory"), Some(&serde_json::Value::String("custom".to_owned())) ); assert_eq!( object.get("protocolName"), Some(&serde_json::Value::String("raydium_cpmm".to_owned())) ); assert_eq!( object.get("eventKind"), Some(&serde_json::Value::String("raydium_cpmm.swap_base_input".to_owned())) ); assert_eq!(object.get("tradeCandidate"), Some(&serde_json::Value::Bool(true))); assert_eq!(object.get("candleCandidate"), Some(&serde_json::Value::Bool(true))); } #[test] fn decoded_event_payload_candidate_flags_win_over_event_kind() { let payload_json = serde_json::json!({ "tradeCandidate": false, "candleCandidate": false }); assert!(!super::is_decoded_event_trade_candidate("pump_swap.buy", &payload_json)); assert!(!super::is_decoded_event_candle_candidate("pump_swap.buy", &payload_json)); } #[test] fn detects_direct_amount_or_price_payload_recursively() { let payload_json = serde_json::json!({ "nested": { "quoteAmountRaw": "2500" } }); assert!(super::decoded_payload_has_trade_amount_or_price_payload(&payload_json)); let empty_payload_json = serde_json::json!({ "nested": { "quoteAmountRaw": "" } }); assert!(!super::decoded_payload_has_trade_amount_or_price_payload(&empty_payload_json)); } }