From 3e994995d74eaab7792e01d0f058d8a01bd7cd3a Mon Sep 17 00:00:00 2001 From: SinuS Von SifriduS Date: Sun, 3 May 2026 18:05:32 +0200 Subject: [PATCH] 0.7.24 --- ROADMAP.md | 107 +++- kb_lib/src/dex.rs | 5 + kb_lib/src/dex/raydium_clmm.rs | 500 ++++++++++++++++ kb_lib/src/dex_decode.rs | 799 ++++++++++++++++++++++---- kb_lib/src/dex_detect.rs | 246 ++++++++ kb_lib/src/lib.rs | 4 + kb_lib/src/pair_candle_aggregation.rs | 22 +- kb_lib/src/trade_aggregation.rs | 227 +++++++- 8 files changed, 1765 insertions(+), 145 deletions(-) create mode 100644 kb_lib/src/dex/raydium_clmm.rs diff --git a/ROADMAP.md b/ROADMAP.md index c38f0ac..3d3fe10 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -695,7 +695,49 @@ Réalisé : - prise en charge des candles matérialisées et des candles régénérées à la demande pour un timeframe custom, - intégration du rendu graphique directement dans `Demo Pipeline`. -### 6.057. Version `0.7.25` — `kb_app` : overlays analytiques +### 6.057. Version `0.7.25` — Enrichissement metadata des tokens +Objectif : rendre le catalogue local lisible et exploitable en associant les mints à des métadonnées minimales fiables. + +À faire : + +- ajouter une couche `token_metadata` dans `kb_lib`, distincte du décodage DEX et de l’UI, +- enrichir `kb_tokens` avec `symbol`, `name`, `decimals`, `metadata_uri`, `metadata_source`, `metadata_status` et `metadata_updated_at` lorsque le schéma le permet, +- ajouter une résolution locale des mints connus (`SOL`, `USDC`, `USDT`, `RAY`, `JitoSOL` et autres références stables), +- réutiliser les payloads déjà décodés des événements `pump_fun.create` / `pump_fun.create_v2_token` pour alimenter `name`, `symbol`, `uri` et `creator`, +- ajouter une résolution Metaplex Token Metadata PDA pour les tokens SPL classiques, +- ajouter une résolution Token-2022 via metadata pointer / extensions lorsque le mint utilise ce standard, +- stocker explicitement les cas non résolus afin d’éviter les tentatives répétées inutiles, +- exposer une commande UI ou un service de backfill permettant d’enrichir les tokens déjà présents en base, +- maintenir une politique de priorité claire entre sources : `known_mint`, `pump_fun_create`, `metaplex`, `token_2022`, puis `unresolved`. + +### 6.058. Version `0.7.26` — Validation multi-DEX et non-régression du pipeline +Objectif : vérifier que les connecteurs déjà branchés restent cohérents avant d’ouvrir la phase d’analyse `0.8.x`. + +À faire : + +- rejouer des bases neuves de test pour `pump_fun`, `pump_swap`, `raydium_cpmm` et `raydium_clmm`, +- vérifier pour chaque DEX le triptyque `decoded_event_count / trade_event_count / pair_candle_count`, +- garantir que les événements non pricés ou non candle ne produisent pas de trade event invalide, +- conserver l’enrichissement `eventCategory`, `tradeCandidate`, `candleCandidate`, `liquidityCandidate`, `feeCandidate`, `rewardCandidate`, `adminCandidate` et `poolLifecycleCandidate` dans `payload_json`, +- documenter les familles d’événements utilisées pour les candles et celles conservées seulement pour l’analyse ou la traçabilité, +- ajouter ou compléter les tests unitaires sur `dex_decode`, `dex_detect`, `trade_aggregation`, `pair_candle_aggregation` et `pair_analytic_signal`, +- ajouter des requêtes SQL de diagnostic de référence pour contrôler rapidement les tables clés après backfill, +- conserver la tolérance aux événements DEX partiels tout en refusant les trades sans montant ou prix exploitable. + +### 6.059. Version `0.7.27` — Raydium AMM v4 legacy : corpus et validation ciblée +Objectif : isoler correctement le vrai Raydium AMM v4 historique et le distinguer de `raydium_cpmm` et `raydium_clmm`. + +À faire : + +- rechercher des pools réellement rattachés au programme `675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8`, +- constituer un petit corpus local de signatures/pools AMM v4 fiables pour les tests, +- vérifier que les adresses issues de Dexscreener ou d’autres explorateurs ne sont pas seulement catégorisées globalement comme `Raydium`, +- ajouter des requêtes de diagnostic par `program_id`, `accounts_json` et préfixe `data_json`, +- valider la prise en charge `initialize2` et identifier les instructions de swap AMM v4 à supporter si elles apparaissent dans les transactions testées, +- renommer et stabiliser les fonctions internes autour de `raydium_amm_v4` afin d’éviter l’ambiguïté avec `raydium_cpmm` et `raydium_clmm`, +- documenter les limites connues si le corpus AMM v4 reste trop faible. + +### 6.060. Version `0.7.28` — `kb_app` : overlays analytiques Objectif : rendre visibles les signaux analytiques directement sur les graphes et vues de marché. À faire : @@ -703,20 +745,22 @@ Objectif : rendre visibles les signaux analytiques directement sur les graphes e - afficher les signaux analytiques par bucket au-dessus ou autour des candles, - ajouter des marqueurs pour `first_trade_seen`, `trade_burst_60s`, `buy_sell_imbalance_60s`, `price_jump_up_60s`, `price_jump_down_60s` et `volume_spike_60s`, - permettre le filtrage par type de signal et par sévérité, -- afficher un panneau latéral listant les signaux liés à une paire et à un timeframe. +- afficher un panneau latéral listant les signaux liés à une paire et à un timeframe, +- préparer l’extension future vers des indicateurs Ichimoku, Kumo, projections ABCD et égalités temps/prix sans les mélanger au pipeline de décodage DEX. -### 6.058. Version `0.7.26` — `kb_app` : vues consolidées token / pair / pool +### 6.061. Version `0.7.29` — `kb_app` : vues consolidées token / pair / pool Objectif : fournir une lecture métier plus confortable du modèle `0.7.x`. À faire : -- ajouter une fiche token, -- ajouter une fiche paire, -- ajouter une fiche pool, +- ajouter une fiche token avec mint, programme token, metadata, pools, paires et historique de découverte, +- ajouter une fiche paire avec base/quote, DEX, pool, métriques, candles, signaux et derniers trades, +- ajouter une fiche pool avec composition, vaults, origine, première signature vue, programme DEX et statut de décodage, - relier dans l’UI les launch origins, pool origins, wallets observés, holdings observés, candles et analytic signals, -- préparer une navigation transversale entre objets techniques et objets métier. +- préparer une navigation transversale entre objets techniques et objets métier, +- rendre explicites les cas `tradeCount = null`, `lastPriceQuotePerBase = null` et tokens non enrichis. -### 6.059. Version `0.7.27` — Finition UI `0.7.x` +### 6.062. Version `0.7.30` — Finition UI `0.7.x` Objectif : stabiliser la couche desktop de validation avant l’ouverture de `0.8.x`. À faire : @@ -724,9 +768,10 @@ Objectif : stabiliser la couche desktop de validation avant l’ouverture de `0. - consolider les vues ajoutées dans `kb_app`, - améliorer la navigation, les filtres et la pagination, - ajouter les derniers raffinements de confort et de lisibilité, -- préparer une base UI suffisamment stable pour la future phase d’analyse et filtrage `0.8.x`. +- préparer une base UI suffisamment stable pour la future phase d’analyse et filtrage `0.8.x`, +- vérifier que les commandes Tauri restent de simples façades vers `kb_lib` et ne récupèrent pas de logique métier. -### 6.060. Version `0.7.x` — Couverture DEX v1 +### 6.063. Version `0.7.x` — Couverture DEX v1 Objectif : structurer les connecteurs DEX autour d’un pipeline complet de résolution, décodage et normalisation métier. Protocoles cibles : @@ -737,7 +782,9 @@ Protocoles cibles : - LaunchLab / Fun Launch - Pump.fun - PumpSwap -- Raydium +- Raydium AMM v4 legacy +- Raydium CPMM +- Raydium CLMM - Orca - Bags - FluxBeam @@ -750,10 +797,12 @@ Résultat attendu : - résolution des signatures pertinentes, - décodage des transactions utiles, - création d’objets métier riches pour tokens, pools, paires, listings, participants et holdings observés, +- enrichissement metadata des tokens découverts, +- séparation claire entre événements candle/trade et événements utiles seulement à l’analyse, aux frais, à la liquidité, aux rewards ou à l’administration, - préparation d’une détection temps réel hybride et d’un backfill ciblé compatible avec les mêmes objets métier, - préparation d’agrégats DEX plus riches, de candles / OHLCV et d’une UI d’inspection du pipeline `0.7.x`. -### 6.061. Version `0.8.x` — Analyse et filtrage +### 6.064. Version `0.8.x` — Analyse et filtrage Objectif : transformer les événements bruts en signaux exploitables. À faire : @@ -763,9 +812,12 @@ Objectif : transformer les événements bruts en signaux exploitables. - exclusions des tokens non tradables, - statistiques de comportement, - premiers patterns, -- enrichissement des signaux analytiques préparés en fin de `0.7.x`. +- enrichissement des signaux analytiques préparés en fin de `0.7.x`, +- indicateurs graphiques optionnels comme Ichimoku / Kumo, +- outils de sélection manuelle de points ABC et projection d’un point D selon des règles temps/prix explicites, +- séparation stricte entre signaux analytiques observés, projections hypothétiques et décisions de trading. -### 6.062. Version `1.x.y` — Wallets et swap préparatoire +### 6.065. Version `1.x.y` — Wallets et swap préparatoire Objectif : préparer la couche d’action. À faire : @@ -776,7 +828,7 @@ Objectif : préparer la couche d’action. - préparation d’ordres et de swaps, - simulation et garde-fous. -### 6.063. Version `2.x.y` — Trading semi-automatisé +### 6.066. Version `2.x.y` — Trading semi-automatisé Objectif : brancher l’analyse à l’action tout en gardant des garde-fous explicites. À faire : @@ -787,7 +839,7 @@ Objectif : brancher l’analyse à l’action tout en gardant des garde-fous exp - confirmations explicites ou semi-automatiques, - journaux d’exécution. -### 6.064. Version `3.x.y` — Yellowstone gRPC +### 6.067. Version `3.x.y` — Yellowstone gRPC Objectif : ajouter le connecteur gRPC dédié. À faire : @@ -814,6 +866,12 @@ Modules cibles à court terme : - `json_rpc_ws.rs` - `solana_pubsub_ws.rs` - `detect.rs` +- `dex_decode.rs` +- `dex_detect.rs` +- `trade_aggregation.rs` +- `pair_candle_aggregation.rs` +- `pair_analytic_signal.rs` +- `token_metadata.rs` ### 7.2. `kb_app` Responsabilités cibles : @@ -875,10 +933,13 @@ Le projet doit maintenir au minimum : La priorité immédiate est désormais la suivante : -1. poursuivre la fin de série `0.7.x` côté `kb_app` avant l’ouverture de `0.8.x`, -2. ajouter un pilotage UI du backfill historique ciblé par `token_mint`, -3. ajouter une vue graphique des candles / OHLCV avec `echarts`, -4. ajouter les overlays des signaux analytiques sur les candles, -5. consolider les vues métier `token / pair / pool` dans `kb_app`, -6. stabiliser l’ergonomie, les filtres et la navigation de l’UI d’inspection, -7. préparer enfin l’arrivée de Yellowstone gRPC comme extension de capacité, et non comme remplacement du socle existant. +1. ajouter l’enrichissement metadata des tokens afin que le catalogue affiche au minimum les symboles/noms connus et les métadonnées résolues, +2. rejouer une campagne de validation multi-DEX sur bases neuves pour `pump_fun`, `pump_swap`, `raydium_cpmm` et `raydium_clmm`, +3. constituer un corpus ciblé pour `raydium_amm_v4` legacy au lieu de s’appuyer sur des labels Raydium trop génériques, +4. conserver les événements non-candle enrichis en payload pour l’analyse future, sans créer de trades invalides, +5. ajouter les overlays des signaux analytiques sur les candles, +6. consolider les vues métier `token / pair / pool` dans `kb_app`, +7. stabiliser l’ergonomie, les filtres et la navigation de l’UI d’inspection, +8. préparer ensuite l’ouverture de `0.8.x` pour l’analyse, les filtres, les patterns et les projections graphiques, +9. préparer enfin Yellowstone gRPC comme extension de capacité, et non comme remplacement du socle HTTP / WS existant. + diff --git a/kb_lib/src/dex.rs b/kb_lib/src/dex.rs index 44a635f..570897c 100644 --- a/kb_lib/src/dex.rs +++ b/kb_lib/src/dex.rs @@ -11,6 +11,7 @@ mod orca_whirlpools; mod pump_fun; mod pump_swap; mod raydium_amm_v4; +mod raydium_clmm; mod raydium_cpmm; pub use dexlab::KB_DEXLAB_PROGRAM_ID; @@ -56,6 +57,10 @@ pub use raydium_amm_v4::KB_RAYDIUM_AMM_V4_PROGRAM_ID; pub use raydium_amm_v4::KbRaydiumAmmV4DecodedEvent; pub use raydium_amm_v4::KbRaydiumAmmV4Decoder; pub use raydium_amm_v4::KbRaydiumAmmV4Initialize2PoolDecoded; +pub use raydium_clmm::KB_RAYDIUM_CLMM_PROGRAM_ID; +pub use raydium_clmm::KbRaydiumClmmDecodedEvent; +pub use raydium_clmm::KbRaydiumClmmSwapV2Decoded; +pub use raydium_clmm::kb_decode_raydium_clmm_instruction; pub use raydium_cpmm::KB_RAYDIUM_CPMM_PROGRAM_ID; pub use raydium_cpmm::KbRaydiumCpmmDecodedEvent; pub use raydium_cpmm::KbRaydiumCpmmSwapDecoded; diff --git a/kb_lib/src/dex/raydium_clmm.rs b/kb_lib/src/dex/raydium_clmm.rs new file mode 100644 index 0000000..24e9157 --- /dev/null +++ b/kb_lib/src/dex/raydium_clmm.rs @@ -0,0 +1,500 @@ +// file: kb_lib/src/dex/raydium_clmm.rs + +//! Raydium CLMM instruction decoder. + +/// Raydium CLMM program id. +pub const KB_RAYDIUM_CLMM_PROGRAM_ID: &str = "CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK"; + +const KB_RAYDIUM_CLMM_SWAP_V2_DISCRIMINATOR: [u8; 8] = [43, 4, 237, 11, 26, 201, 30, 98]; + +const KB_RAYDIUM_CLMM_SWAP_LEGACY_DISCRIMINATOR: [u8; 8] = [248, 198, 158, 145, 225, 117, 135, 200]; + +/// Decoded Raydium CLMM event. +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)] +pub enum KbRaydiumClmmDecodedEvent { + /// Raydium CLMM swap_v2 event. + SwapV2(crate::KbRaydiumClmmSwapV2Decoded), +} + +impl KbRaydiumClmmDecodedEvent { + /// Returns the normalized event kind. + pub fn event_kind(&self) -> &'static str { + match self { + crate::KbRaydiumClmmDecodedEvent::SwapV2(_) => "raydium_clmm.swap_v2", + } + } + + /// Returns the pool account. + pub fn pool_account(&self) -> &str { + match self { + crate::KbRaydiumClmmDecodedEvent::SwapV2(event) => event.pool_state.as_str(), + } + } + + /// Returns the normalized base mint. + pub fn base_mint(&self) -> &str { + match self { + crate::KbRaydiumClmmDecodedEvent::SwapV2(event) => event.base_mint.as_str(), + } + } + + /// Returns the normalized quote mint. + pub fn quote_mint(&self) -> &str { + match self { + crate::KbRaydiumClmmDecodedEvent::SwapV2(event) => event.quote_mint.as_str(), + } + } + + /// Converts the decoded event to JSON payload. + pub fn to_payload_json(&self) -> std::option::Option { + match self { + crate::KbRaydiumClmmDecodedEvent::SwapV2(event) => { + let result = serde_json::to_string(event); + match result { + Ok(payload_json) => Some(payload_json), + Err(_) => None, + } + } + } + } +} + +/// Decoded Raydium CLMM swap_v2 instruction. +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)] +pub struct KbRaydiumClmmSwapV2Decoded { + /// User performing the swap. + pub payer: std::string::String, + /// AMM config account. + pub amm_config: std::string::String, + /// CLMM pool state account. + pub pool_state: std::string::String, + /// User input token account. + pub input_token_account: std::string::String, + /// User output token account. + pub output_token_account: std::string::String, + /// Pool input vault. + pub input_vault: std::string::String, + /// Pool output vault. + pub output_vault: std::string::String, + /// Pool oracle observation state. + pub observation_state: std::string::String, + /// Input vault mint. + pub input_vault_mint: std::string::String, + /// Output vault mint. + pub output_vault_mint: std::string::String, + /// Canonical base mint. + pub base_mint: std::string::String, + /// Canonical quote mint. + pub quote_mint: std::string::String, + /// Canonical base vault. + pub base_vault: std::string::String, + /// Canonical quote vault. + pub quote_vault: std::string::String, + /// Trade side relative to the canonical base mint. + #[serde(rename = "tradeSide")] + pub trade_side: std::string::String, + /// Amount argument. + pub amount: u64, + /// Other amount threshold argument. + pub other_amount_threshold: u64, + /// Sqrt price limit as decimal string. + pub sqrt_price_limit_x64: std::string::String, + /// Whether the instruction uses exact input mode. + pub is_base_input: bool, +} + +/// Decodes a Raydium CLMM instruction. +pub fn kb_decode_raydium_clmm_instruction( + accounts_json: &str, + data_json: &str, +) -> std::vec::Vec { + let mut decoded = std::vec::Vec::new(); + let accounts_result = serde_json::from_str::>(accounts_json); + let accounts = match accounts_result { + Ok(accounts) => accounts, + Err(_) => return decoded, + }; + let data_base58_result = serde_json::from_str::(data_json); + let data_base58 = match data_base58_result { + Ok(data_base58) => data_base58, + Err(_) => data_json.to_string(), + }; + let data_option = kb_decode_base58(data_base58.as_str()); + let data = match data_option { + Some(data) => data, + None => return decoded, + }; + if data.len() < 41 { + return decoded; + } + let discriminator_option = kb_read_discriminator(data.as_slice()); + let discriminator = match discriminator_option { + Some(discriminator) => discriminator, + None => return decoded, + }; + if discriminator == KB_RAYDIUM_CLMM_SWAP_LEGACY_DISCRIMINATOR { + return decoded; + } + if discriminator != KB_RAYDIUM_CLMM_SWAP_V2_DISCRIMINATOR { + return decoded; + } + let event_option = kb_decode_swap_v2(accounts.as_slice(), data.as_slice()); + let event = match event_option { + Some(event) => event, + None => return decoded, + }; + decoded.push(crate::KbRaydiumClmmDecodedEvent::SwapV2(event)); + decoded +} + +fn kb_decode_swap_v2( + accounts: &[std::string::String], + data: &[u8], +) -> std::option::Option { + let payer = match kb_clone_account(accounts, 0) { + Some(value) => value, + None => return None, + }; + let amm_config = match kb_clone_account(accounts, 1) { + Some(value) => value, + None => return None, + }; + let pool_state = match kb_clone_account(accounts, 2) { + Some(value) => value, + None => return None, + }; + let input_token_account = match kb_clone_account(accounts, 3) { + Some(value) => value, + None => return None, + }; + let output_token_account = match kb_clone_account(accounts, 4) { + Some(value) => value, + None => return None, + }; + let input_vault = match kb_clone_account(accounts, 5) { + Some(value) => value, + None => return None, + }; + let output_vault = match kb_clone_account(accounts, 6) { + Some(value) => value, + None => return None, + }; + let observation_state = match kb_clone_account(accounts, 7) { + Some(value) => value, + None => return None, + }; + let input_vault_mint = match kb_clone_account(accounts, 11) { + Some(value) => value, + None => return None, + }; + let output_vault_mint = match kb_clone_account(accounts, 12) { + Some(value) => value, + None => return None, + }; + let amount = match kb_read_u64_le(data, 8) { + Some(value) => value, + None => return None, + }; + let other_amount_threshold = match kb_read_u64_le(data, 16) { + Some(value) => value, + None => return None, + }; + let sqrt_price_limit_x64 = match kb_read_u128_le(data, 24) { + Some(value) => value, + None => return None, + }; + let is_base_input = match kb_read_bool(data, 40) { + Some(value) => value, + None => return None, + }; + let mut base_mint = input_vault_mint.clone(); + let mut quote_mint = output_vault_mint.clone(); + let mut base_vault = input_vault.clone(); + let mut quote_vault = output_vault.clone(); + let mut trade_side = "SellBase".to_string(); + if output_vault_mint.as_str() < input_vault_mint.as_str() { + base_mint = output_vault_mint.clone(); + quote_mint = input_vault_mint.clone(); + base_vault = output_vault.clone(); + quote_vault = input_vault.clone(); + trade_side = "BuyBase".to_string(); + } + Some(crate::KbRaydiumClmmSwapV2Decoded { + payer, + amm_config, + pool_state, + input_token_account, + output_token_account, + input_vault, + output_vault, + observation_state, + input_vault_mint, + output_vault_mint, + base_mint, + quote_mint, + base_vault, + quote_vault, + trade_side, + amount, + other_amount_threshold, + sqrt_price_limit_x64: sqrt_price_limit_x64.to_string(), + is_base_input, + }) +} + +fn kb_clone_account( + accounts: &[std::string::String], + index: usize, +) -> std::option::Option { + let account_option = accounts.get(index); + match account_option { + Some(account) => Some(account.clone()), + None => None, + } +} + +fn kb_read_discriminator(data: &[u8]) -> std::option::Option<[u8; 8]> { + if data.len() < 8 { + return None; + } + let mut bytes = [0_u8; 8]; + let mut index = 0_usize; + while index < 8 { + bytes[index] = data[index]; + index += 1; + } + Some(bytes) +} + +fn kb_read_u64_le(data: &[u8], offset: usize) -> std::option::Option { + if data.len() < offset + 8 { + return None; + } + let mut bytes = [0_u8; 8]; + let mut index = 0_usize; + while index < 8 { + bytes[index] = data[offset + index]; + index += 1; + } + Some(u64::from_le_bytes(bytes)) +} + +fn kb_read_u128_le(data: &[u8], offset: usize) -> std::option::Option { + if data.len() < offset + 16 { + return None; + } + let mut bytes = [0_u8; 16]; + let mut index = 0_usize; + while index < 16 { + bytes[index] = data[offset + index]; + index += 1; + } + Some(u128::from_le_bytes(bytes)) +} + +fn kb_read_bool(data: &[u8], offset: usize) -> std::option::Option { + if data.len() <= offset { + return None; + } + match data[offset] { + 0 => Some(false), + 1 => Some(true), + _ => None, + } +} + +fn kb_decode_base58(input: &str) -> std::option::Option> { + let alphabet = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz".as_bytes(); + let mut bytes: std::vec::Vec = std::vec::Vec::new(); + for input_byte in input.bytes() { + let mut value_option = None; + let mut alphabet_index = 0_usize; + while alphabet_index < alphabet.len() { + if alphabet[alphabet_index] == input_byte { + value_option = Some(alphabet_index as u32); + break; + } + alphabet_index += 1; + } + let mut carry = match value_option { + Some(value) => value, + None => return None, + }; + let mut byte_index = bytes.len(); + while byte_index > 0 { + byte_index -= 1; + let value = (bytes[byte_index] as u32) * 58 + carry; + bytes[byte_index] = (value & 0xff) as u8; + carry = value >> 8; + } + while carry > 0 { + bytes.insert(0, (carry & 0xff) as u8); + carry >>= 8; + } + } + let mut leading_zero_count = 0_usize; + for input_byte in input.bytes() { + if input_byte == b'1' { + leading_zero_count += 1; + } else { + break; + } + } + let mut result = std::vec::Vec::new(); + let mut index = 0_usize; + while index < leading_zero_count { + result.push(0_u8); + index += 1; + } + for byte in bytes { + result.push(byte); + } + Some(result) +} + +#[cfg(test)] +mod tests { + fn sample_swap_v2_accounts_json() -> &'static str { + r#"[ + "8NQ32SyFKD1d5kenq4oM8Da6C6J9TQSMW1uAgFRveEQr", + "A1BBtTYJd4i3xU8D6Tc2FzU6ZN4oXZWXKZnCxwbHXr8x", + "GUrRxvnWVQSnbcz1eP9D5BqXwPZtRhmrqVfm5wY9meWR", + "D2frZyyQ7NQaXRiEoBUM9S64Ckv7KZ7wuqupqdMhpsHy", + "H7qe6sAyEyqztyMtRrDf5J1gugLx6yuyKPy5veVmR14W", + "AvRzvwpSVnxsinLGQS3vZLqkZxhXZDM8F2qKccAo7rSq", + "CTkc4xDrpzjWcFLC1cxmUZZjZLSRV46HZa8wu5eKTbuh", + "8QtFSxNzD3zmEX8nzQKZB83TH4WGUAkLkQoRHAw5fuhn", + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA", + "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb", + "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr", + "CKvjP8FrZpaKXjASEtX2nEU9w7M4RKskfnLQbKJBodV", + "7vfCXTUXx5WJV5JADk17DUJ4ksgau7utNKj4b963voxs", + "8ovxZR2Gv9Mr73aoXLQYTMaZvHCSpnEohzgjVHQwmyHr", + "9MssDxndh2Rn8DmGWL94hXVv22zxfDYHV7tvzfPgcaWe" + ]"# + } + + #[test] + fn decodes_swap_v2() { + let events = crate::kb_decode_raydium_clmm_instruction( + sample_swap_v2_accounts_json(), + r#""ASCsAbe1UnDnCsnGLPALJUXSS5JREycfhGyTzKh7xRWNyRHCqBuzR23S""#, + ); + assert_eq!(events.len(), 1); + match &events[0] { + crate::KbRaydiumClmmDecodedEvent::SwapV2(event) => { + assert_eq!(events[0].event_kind(), "raydium_clmm.swap_v2"); + assert_eq!( + events[0].pool_account(), + "GUrRxvnWVQSnbcz1eP9D5BqXwPZtRhmrqVfm5wY9meWR" + ); + assert_eq!( + event.pool_state, + "GUrRxvnWVQSnbcz1eP9D5BqXwPZtRhmrqVfm5wY9meWR" + ); + assert_eq!( + event.input_vault, + "AvRzvwpSVnxsinLGQS3vZLqkZxhXZDM8F2qKccAo7rSq" + ); + assert_eq!( + event.output_vault, + "CTkc4xDrpzjWcFLC1cxmUZZjZLSRV46HZa8wu5eKTbuh" + ); + assert_eq!( + event.input_vault_mint, + "CKvjP8FrZpaKXjASEtX2nEU9w7M4RKskfnLQbKJBodV" + ); + assert_eq!( + event.output_vault_mint, + "7vfCXTUXx5WJV5JADk17DUJ4ksgau7utNKj4b963voxs" + ); + assert_eq!( + event.base_mint, + "7vfCXTUXx5WJV5JADk17DUJ4ksgau7utNKj4b963voxs" + ); + assert_eq!( + event.quote_mint, + "CKvjP8FrZpaKXjASEtX2nEU9w7M4RKskfnLQbKJBodV" + ); + assert_eq!( + event.base_vault, + "CTkc4xDrpzjWcFLC1cxmUZZjZLSRV46HZa8wu5eKTbuh" + ); + assert_eq!( + event.quote_vault, + "AvRzvwpSVnxsinLGQS3vZLqkZxhXZDM8F2qKccAo7rSq" + ); + assert_eq!(event.trade_side, "BuyBase"); + assert_eq!(event.amount, 148441657491969); + assert_eq!(event.other_amount_threshold, 0); + assert_eq!(event.sqrt_price_limit_x64, "0"); + assert_eq!(event.is_base_input, true); + } + } + } + + #[test] + fn serializes_swap_v2_payload_json() { + let events = crate::kb_decode_raydium_clmm_instruction( + sample_swap_v2_accounts_json(), + r#""ASCsAbe1UnDnCsnGLPALJUXSS5JREycfhGyTzKh7xRWNyRHCqBuzR23S""#, + ); + assert_eq!(events.len(), 1); + let payload_option = events[0].to_payload_json(); + let payload = match payload_option { + Some(payload) => payload, + None => panic!("payload json must be available"), + }; + assert!(payload.contains("GUrRxvnWVQSnbcz1eP9D5BqXwPZtRhmrqVfm5wY9meWR")); + assert!(payload.contains("input_vault")); + assert!(payload.contains("output_vault")); + assert!(payload.contains("tradeSide")); + } + + #[test] + fn ignores_invalid_data() { + let events = crate::kb_decode_raydium_clmm_instruction( + sample_swap_v2_accounts_json(), + r#""not-base58-data-0""#, + ); + assert_eq!(events.len(), 0); + } + + #[test] + fn ignores_incomplete_accounts() { + let accounts_json = r#"[ + "8NQ32SyFKD1d5kenq4oM8Da6C6J9TQSMW1uAgFRveEQr", + "A1BBtTYJd4i3xU8D6Tc2FzU6ZN4oXZWXKZnCxwbHXr8x", + "GUrRxvnWVQSnbcz1eP9D5BqXwPZtRhmrqVfm5wY9meWR" + ]"#; + let events = crate::kb_decode_raydium_clmm_instruction( + accounts_json, + r#""ASCsAbe1UnDnCsnGLPALJUXSS5JREycfhGyTzKh7xRWNyRHCqBuzR23S""#, + ); + assert_eq!(events.len(), 0); + } + + #[test] + fn ignores_legacy_swap_for_now() { + let mut data = std::vec::Vec::::new(); + data.push(248); + data.push(198); + data.push(158); + data.push(145); + data.push(225); + data.push(117); + data.push(135); + data.push(200); + while data.len() < 41 { + data.push(0); + } + data[40] = 1; + let encoded = bs58::encode(data).into_string(); + let data_json = format!("\"{}\"", encoded); + let events = crate::kb_decode_raydium_clmm_instruction( + sample_swap_v2_accounts_json(), + data_json.as_str(), + ); + assert_eq!(events.len(), 0); + } +} diff --git a/kb_lib/src/dex_decode.rs b/kb_lib/src/dex_decode.rs index 6e755c4..f2db75d 100644 --- a/kb_lib/src/dex_decode.rs +++ b/kb_lib/src/dex_decode.rs @@ -37,6 +37,42 @@ impl KbDexDecodeService { } } + async fn decode_and_persist_raydium_clmm_events( + &self, + transaction: &crate::KbChainTransactionDto, + instructions: &[crate::KbChainInstructionDto], + ) -> Result, crate::KbError> { + let mut persisted = std::vec::Vec::new(); + for instruction in instructions { + let program_id = match instruction.program_id.as_ref() { + Some(program_id) => program_id, + None => continue, + }; + if program_id.as_str() != crate::KB_RAYDIUM_CLMM_PROGRAM_ID { + continue; + } + let data_json = match instruction.data_json.as_ref() { + Some(data_json) => data_json, + None => continue, + }; + let decoded_events = crate::kb_decode_raydium_clmm_instruction( + instruction.accounts_json.as_str(), + data_json.as_str(), + ); + for decoded_event in &decoded_events { + let persist_result = self + .persist_raydium_clmm_event(transaction, instruction, decoded_event) + .await; + let persisted_event = match persist_result { + Ok(persisted_event) => persisted_event, + Err(error) => return Err(error), + }; + persisted.push(persisted_event); + } + } + Ok(persisted) + } + /// Decodes one projected transaction and persists the decoded events. pub async fn decode_transaction_by_signature( &self, @@ -104,6 +140,16 @@ impl KbDexDecodeService { for persisted_event in raydium_cpmm_persisted { persisted.push(persisted_event); } + let raydium_clmm_persisted_result = self + .decode_and_persist_raydium_clmm_events(&transaction, &instructions) + .await; + let raydium_clmm_persisted = match raydium_clmm_persisted_result { + Ok(raydium_clmm_persisted) => raydium_clmm_persisted, + Err(error) => return Err(error), + }; + for persisted_event in raydium_clmm_persisted { + persisted.push(persisted_event); + } let pump_fun_decoded_result = self .pump_fun_decoder .decode_transaction(&transaction, &instructions); @@ -248,15 +294,14 @@ impl KbDexDecodeService { ) -> Result { match decoded_event { crate::KbDexlabDecodedEvent::CreatePool(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "dexlab", + "dexlab.create_pool", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded dexlab payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -342,15 +387,14 @@ impl KbDexDecodeService { Ok(fetched) } crate::KbDexlabDecodedEvent::Swap(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "dexlab", + "dexlab.swap", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded dexlab payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -445,15 +489,14 @@ impl KbDexDecodeService { ) -> Result { match decoded_event { crate::KbFluxbeamDecodedEvent::CreatePool(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "fluxbeam", + "fluxbeam.create_pool", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded fluxbeam payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -539,15 +582,14 @@ impl KbDexDecodeService { Ok(fetched) } crate::KbFluxbeamDecodedEvent::Swap(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "fluxbeam", + "fluxbeam.swap", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded fluxbeam payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -642,15 +684,14 @@ impl KbDexDecodeService { ) -> Result { match decoded_event { crate::KbOrcaWhirlpoolsDecodedEvent::CreatePool(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "orca_whirlpools", + "orca_whirlpools.create_pool", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded orca whirlpools payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -736,15 +777,14 @@ impl KbDexDecodeService { Ok(fetched) } crate::KbOrcaWhirlpoolsDecodedEvent::Swap(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "orca_whirlpools", + "orca_whirlpools.swap", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded orca whirlpools payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -840,15 +880,14 @@ impl KbDexDecodeService { ) -> Result { match decoded_event { crate::KbMeteoraDammV1DecodedEvent::CreatePool(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "meteora_damm_v1", + "meteora_damm_v1.create_pool", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded meteora damm v1 payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -934,15 +973,14 @@ impl KbDexDecodeService { Ok(fetched) } crate::KbMeteoraDammV1DecodedEvent::Swap(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "meteora_damm_v1", + "meteora_damm_v1.swap", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded meteora damm v1 payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -1037,15 +1075,14 @@ impl KbDexDecodeService { ) -> Result { match decoded_event { crate::KbMeteoraDammV2DecodedEvent::CreatePool(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "meteora_damm_v2", + "meteora_damm_v2.create_pool", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded meteora damm v2 payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -1131,15 +1168,14 @@ impl KbDexDecodeService { Ok(fetched) } crate::KbMeteoraDammV2DecodedEvent::Swap(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "meteora_damm_v2", + "meteora_damm_v2.swap", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded meteora damm v2 payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -1234,15 +1270,14 @@ impl KbDexDecodeService { ) -> Result { match decoded_event { crate::KbMeteoraDbcDecodedEvent::CreatePool(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "meteora_dbc", + "meteora_dbc.create_pool", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded meteora dbc payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -1328,15 +1363,14 @@ impl KbDexDecodeService { Ok(fetched) } crate::KbMeteoraDbcDecodedEvent::Swap(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "meteora_dbc", + "meteora_dbc.swap", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded meteora dbc payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -1431,15 +1465,14 @@ impl KbDexDecodeService { ) -> Result { match decoded_event { crate::KbRaydiumAmmV4DecodedEvent::Initialize2Pool(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "raydium_amm_v4", + "raydium_amm_v4.initialize2_pool", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded raydium payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -1563,6 +1596,141 @@ impl KbDexDecodeService { Ok(persisted) } + async fn persist_raydium_clmm_event( + &self, + transaction: &crate::KbChainTransactionDto, + instruction: &crate::KbChainInstructionDto, + decoded_event: &crate::KbRaydiumClmmDecodedEvent, + ) -> Result { + let transaction_id = match transaction.id { + Some(transaction_id) => transaction_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "transaction '{}' has no internal id", + transaction.signature + ))); + } + }; + let instruction_id = match instruction.id { + Some(instruction_id) => instruction_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "raydium clmm instruction for transaction '{}' has no internal id", + transaction.signature + ))); + } + }; + let event_kind = decoded_event.event_kind().to_string(); + let raw_payload_json = match decoded_event.to_payload_json() { + Some(payload_json) => payload_json, + None => { + return Err(crate::KbError::Json( + "cannot serialize decoded raydium clmm payload".to_string(), + )); + } + }; + let payload_json_result = kb_enrich_serialized_dex_decoded_payload( + "raydium_clmm", + event_kind.as_str(), + raw_payload_json.as_str(), + ); + let payload_json = match payload_json_result { + Ok(payload_json) => payload_json, + Err(error) => return Err(error), + }; + let existing_result = crate::get_dex_decoded_event_by_key( + self.database.as_ref(), + transaction_id, + Some(instruction_id), + event_kind.as_str(), + ) + .await; + let existing_option = match existing_result { + Ok(existing_option) => existing_option, + Err(error) => return Err(error), + }; + let already_present = existing_option.is_some(); + let dto = crate::KbDexDecodedEventDto::new( + transaction_id, + Some(instruction_id), + "raydium_clmm".to_string(), + crate::KB_RAYDIUM_CLMM_PROGRAM_ID.to_string(), + event_kind.clone(), + Some(decoded_event.pool_account().to_string()), + None, + Some(decoded_event.base_mint().to_string()), + Some(decoded_event.quote_mint().to_string()), + None, + payload_json.clone(), + ); + let upsert_result = crate::upsert_dex_decoded_event(self.database.as_ref(), &dto).await; + if let Err(error) = upsert_result { + return Err(error); + } + let fetched_result = crate::get_dex_decoded_event_by_key( + self.database.as_ref(), + transaction_id, + Some(instruction_id), + event_kind.as_str(), + ) + .await; + let fetched_option = match fetched_result { + Ok(fetched_option) => fetched_option, + Err(error) => return Err(error), + }; + let fetched = match fetched_option { + Some(fetched) => fetched, + None => { + return Err(crate::KbError::InvalidState( + "decoded raydium clmm event disappeared after upsert".to_string(), + )); + } + }; + if !already_present { + let payload_value_result = + serde_json::from_str::(payload_json.as_str()); + let payload_value = match payload_value_result { + Ok(payload_value) => payload_value, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot parse raydium clmm payload after serialization: {}", + error + ))); + } + }; + let observation_result = self + .persistence + .record_observation(&crate::KbDetectionObservationInput::new( + format!("dex.{}", event_kind), + crate::KbObservationSourceKind::HttpRpc, + transaction.source_endpoint_name.clone(), + transaction.signature.clone(), + transaction.slot, + payload_value.clone(), + )) + .await; + let observation_id = match observation_result { + Ok(observation_id) => observation_id, + Err(error) => return Err(error), + }; + let signal_result = self + .persistence + .record_signal(&crate::KbDetectionSignalInput::new( + format!("signal.dex.{}", event_kind), + crate::KbAnalysisSignalSeverity::Low, + transaction.signature.clone(), + Some(observation_id), + None, + payload_value, + )) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + Ok(fetched) + } + async fn persist_raydium_cpmm_event( &self, transaction: &crate::KbChainTransactionDto, @@ -1587,7 +1755,8 @@ impl KbDexDecodeService { ))); } }; - let payload_json = match decoded_event.to_payload_json() { + let event_kind = decoded_event.event_kind().to_string(); + let raw_payload_json = match decoded_event.to_payload_json() { Some(payload_json) => payload_json, None => { return Err(crate::KbError::Json( @@ -1595,7 +1764,15 @@ impl KbDexDecodeService { )); } }; - let event_kind = decoded_event.event_kind().to_string(); + let payload_json_result = kb_enrich_serialized_dex_decoded_payload( + "raydium_cpmm", + event_kind.as_str(), + raw_payload_json.as_str(), + ); + let payload_json = match payload_json_result { + Ok(payload_json) => payload_json, + Err(error) => return Err(error), + }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), transaction_id, @@ -1696,15 +1873,14 @@ impl KbDexDecodeService { ) -> Result { match decoded_event { crate::KbPumpFunDecodedEvent::CreateV2Token(event) => { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "pump_fun", + "pump_fun.create_v2_token", + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded pump.fun payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -1820,15 +1996,14 @@ impl KbDexDecodeService { signal_kind: &str, observation_kind: &str, ) -> Result { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "pump_fun", + event_kind, + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded pump.fun trade payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -1950,15 +2125,14 @@ impl KbDexDecodeService { signal_kind: &str, observation_kind: &str, ) -> Result { - let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload( + "pump_swap", + event_kind, + event.payload_json.clone(), + ); let payload_json = match payload_json_result { Ok(payload_json) => payload_json, - Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot serialize decoded pump swap payload: {}", - error - ))); - } + Err(error) => return Err(error), }; let existing_result = crate::get_dex_decoded_event_by_key( self.database.as_ref(), @@ -2044,8 +2218,258 @@ impl KbDexDecodeService { } } +// Classifies a DEX event kind into a stable business category. +fn kb_classify_dex_event_category(event_kind: &str) -> &'static str { + if kb_is_dex_reward_event_kind(event_kind) { + return "reward"; + } + if kb_is_dex_fee_event_kind(event_kind) { + return "fee"; + } + if kb_is_dex_liquidity_event_kind(event_kind) { + return "liquidity"; + } + if kb_is_dex_pool_lifecycle_event_kind(event_kind) { + return "pool_lifecycle"; + } + if kb_is_dex_admin_event_kind(event_kind) { + return "admin"; + } + if kb_is_dex_trade_event_kind(event_kind) { + return "trade"; + } + "unknown" +} + +// Returns true when the event kind represents a swap-like event. +fn kb_is_dex_trade_event_kind(event_kind: &str) -> bool { + if event_kind.ends_with(".buy") { + return true; + } + if event_kind.ends_with(".sell") { + return true; + } + if event_kind.ends_with(".swap") { + return true; + } + if event_kind.contains(".swap_") { + return true; + } + false +} + +// Returns true when the event kind can directly produce a candle candidate. +fn kb_is_dex_candle_candidate_event_kind(event_kind: &str) -> bool { + if event_kind.contains("router") { + return false; + } + if event_kind.contains("route") { + return false; + } + kb_is_dex_trade_event_kind(event_kind) +} + +// Returns true for liquidity lifecycle changes that must not become candles. +fn kb_is_dex_liquidity_event_kind(event_kind: &str) -> bool { + if event_kind.contains(".deposit") { + return true; + } + if event_kind.contains(".withdraw") { + return true; + } + if event_kind.contains(".increase_liquidity") { + return true; + } + if event_kind.contains(".decrease_liquidity") { + return true; + } + if event_kind.contains(".open_position") { + return true; + } + if event_kind.contains(".close_position") { + return true; + } + false +} + +// Returns true for fee collection events. +fn kb_is_dex_fee_event_kind(event_kind: &str) -> bool { + if event_kind.contains("collect_creator_fee") { + return true; + } + if event_kind.contains("collect_protocol_fee") { + return true; + } + if event_kind.contains("collect_fund_fee") { + return true; + } + if event_kind.contains("collect_fee") { + return true; + } + false +} + +// Returns true for reward/incentive events. +fn kb_is_dex_reward_event_kind(event_kind: &str) -> bool { + if event_kind.contains("reward") { + return true; + } + if event_kind.contains("emission") { + return true; + } + false +} + +// Returns true for pool creation / initialization / migration events. +fn kb_is_dex_pool_lifecycle_event_kind(event_kind: &str) -> bool { + if event_kind.contains(".initialize") { + return true; + } + if event_kind.contains(".initialize_with_permission") { + return true; + } + if event_kind.contains(".create_pool") { + return true; + } + if event_kind.contains(".create_v2_token") { + return true; + } + if event_kind.contains(".migrate") { + return true; + } + false +} + +// Returns true for admin/config/permission changes. +fn kb_is_dex_admin_event_kind(event_kind: &str) -> bool { + if event_kind.contains("admin") { + return true; + } + if event_kind.contains("config") { + return true; + } + if event_kind.contains("permission") { + return true; + } + if event_kind.contains("set_") { + return true; + } + if event_kind.contains("update_") { + return true; + } + false +} + +// Enriches a decoded payload with non-destructive classification metadata. +fn kb_enrich_dex_decoded_payload( + protocol_name: &str, + event_kind: &str, + payload_json: serde_json::Value, +) -> serde_json::Value { + let event_category = kb_classify_dex_event_category(event_kind); + let trade_candidate = kb_is_dex_trade_event_kind(event_kind); + let candle_candidate = kb_is_dex_candle_candidate_event_kind(event_kind); + let mut object = match payload_json { + serde_json::Value::Object(object) => object, + other => { + let mut object = serde_json::Map::new(); + object.insert("rawPayload".to_owned(), other); + object + } + }; + kb_json_insert_string_if_missing(&mut object, "protocolName", protocol_name); + kb_json_insert_string_if_missing(&mut object, "eventKind", event_kind); + kb_json_insert_string_if_missing(&mut object, "eventCategory", event_category); + kb_json_insert_bool_if_missing(&mut object, "tradeCandidate", trade_candidate); + kb_json_insert_bool_if_missing(&mut object, "candleCandidate", candle_candidate); + kb_json_insert_i64_if_missing(&mut object, "eventClassificationVersion", 1); + if !trade_candidate { + kb_json_insert_string_if_missing(&mut object, "skipTradeReason", "non_trade_event"); + } else if !candle_candidate { + kb_json_insert_string_if_missing( + &mut object, + "skipCandleReason", + "route_or_multihop_event_requires_leg_resolution", + ); + } + serde_json::Value::Object(object) +} + +// Inserts a string JSON property without overriding existing decoded data. +fn kb_json_insert_string_if_missing( + object: &mut serde_json::Map, + key: &str, + value: &str, +) { + if object.contains_key(key) { + return; + } + object.insert(key.to_owned(), serde_json::Value::String(value.to_owned())); +} + +// Inserts a bool JSON property without overriding existing decoded data. +fn kb_json_insert_bool_if_missing( + object: &mut serde_json::Map, + key: &str, + value: bool, +) { + if object.contains_key(key) { + return; + } + object.insert(key.to_owned(), serde_json::Value::Bool(value)); +} + +// Inserts an i64 JSON property without overriding existing decoded data. +fn kb_json_insert_i64_if_missing( + object: &mut serde_json::Map, + key: &str, + value: i64, +) { + if object.contains_key(key) { + return; + } + object.insert( + key.to_owned(), + serde_json::Value::Number(serde_json::Number::from(value)), + ); +} +fn kb_enrich_and_serialize_dex_decoded_payload( + protocol_name: &str, + event_kind: &str, + payload_json: serde_json::Value, +) -> Result { + let enriched_payload = kb_enrich_dex_decoded_payload(protocol_name, event_kind, payload_json); + let payload_json_result = serde_json::to_string(&enriched_payload); + match payload_json_result { + Ok(payload_json) => Ok(payload_json), + Err(error) => Err(crate::KbError::Json(format!( + "cannot serialize enriched decoded payload for '{}': {}", + event_kind, error + ))), + } +} + +fn kb_enrich_serialized_dex_decoded_payload( + protocol_name: &str, + event_kind: &str, + payload_json: &str, +) -> Result { + let payload_value_result = serde_json::from_str::(payload_json); + let payload_value = match payload_value_result { + Ok(payload_value) => payload_value, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot parse decoded payload for '{}': {}", + event_kind, error + ))); + } + }; + kb_enrich_and_serialize_dex_decoded_payload(protocol_name, event_kind, payload_value) +} + #[cfg(test)] mod tests { + async fn make_database() -> std::sync::Arc { let tempdir_result = tempfile::tempdir(); let tempdir = match tempdir_result { @@ -2836,4 +3260,151 @@ mod tests { Some("So11111111111111111111111111111111111111112".to_string()) ); } + + #[test] + fn classifies_swap_events_as_trade_candidates() { + assert_eq!( + super::kb_classify_dex_event_category("raydium_cpmm.swap_base_input"), + "trade" + ); + assert_eq!( + super::kb_classify_dex_event_category("raydium_cpmm.swap_base_output"), + "trade" + ); + assert_eq!( + super::kb_classify_dex_event_category("raydium_clmm.swap"), + "trade" + ); + assert_eq!( + super::kb_classify_dex_event_category("raydium_clmm.swap_v2"), + "trade" + ); + assert_eq!( + super::kb_classify_dex_event_category("pump_fun.buy"), + "trade" + ); + assert!(super::kb_is_dex_trade_event_kind( + "raydium_cpmm.swap_base_input" + )); + assert!(super::kb_is_dex_candle_candidate_event_kind( + "raydium_cpmm.swap_base_input" + )); + } + + #[test] + fn classifies_router_swap_as_trade_but_not_direct_candle_candidate() { + assert_eq!( + super::kb_classify_dex_event_category("raydium_clmm.swap_router_base_in"), + "trade" + ); + assert!(super::kb_is_dex_trade_event_kind( + "raydium_clmm.swap_router_base_in" + )); + assert!(!super::kb_is_dex_candle_candidate_event_kind( + "raydium_clmm.swap_router_base_in" + )); + } + + #[test] + fn classifies_fee_reward_liquidity_and_lifecycle_events() { + assert_eq!( + super::kb_classify_dex_event_category("raydium_cpmm.collect_creator_fee"), + "fee" + ); + assert_eq!( + super::kb_classify_dex_event_category("raydium_clmm.collect_protocol_fee"), + "fee" + ); + assert_eq!( + super::kb_classify_dex_event_category("raydium_clmm.set_reward_params"), + "reward" + ); + assert_eq!( + super::kb_classify_dex_event_category("raydium_clmm.increase_liquidity_v2"), + "liquidity" + ); + assert_eq!( + super::kb_classify_dex_event_category("raydium_cpmm.initialize"), + "pool_lifecycle" + ); + } + + #[test] + fn enriches_payload_without_overriding_existing_fields() { + let payload_json = serde_json::json!({ + "eventCategory": "custom", + "amountIn": "10" + }); + let enriched_payload = super::kb_enrich_dex_decoded_payload( + "raydium_cpmm", + "raydium_cpmm.swap_base_input", + payload_json, + ); + let object_option = enriched_payload.as_object(); + let object = match object_option { + Some(object) => object, + None => { + panic!("expected enriched payload object"); + } + }; + assert_eq!( + object.get("eventCategory"), + Some(&serde_json::Value::String("custom".to_owned())) + ); + assert_eq!( + object.get("protocolName"), + Some(&serde_json::Value::String("raydium_cpmm".to_owned())) + ); + assert_eq!( + object.get("eventKind"), + Some(&serde_json::Value::String( + "raydium_cpmm.swap_base_input".to_owned() + )) + ); + assert_eq!( + object.get("tradeCandidate"), + Some(&serde_json::Value::Bool(true)) + ); + assert_eq!( + object.get("candleCandidate"), + Some(&serde_json::Value::Bool(true)) + ); + } + + #[test] + fn enriches_non_object_payload_as_raw_payload() { + let payload_json = serde_json::Value::String("raw".to_owned()); + let enriched_payload = super::kb_enrich_dex_decoded_payload( + "raydium_clmm", + "raydium_clmm.collect_protocol_fee", + payload_json, + ); + let object_option = enriched_payload.as_object(); + let object = match object_option { + Some(object) => object, + None => { + panic!("expected enriched payload object"); + } + }; + assert_eq!( + object.get("rawPayload"), + Some(&serde_json::Value::String("raw".to_owned())) + ); + assert_eq!( + object.get("eventCategory"), + Some(&serde_json::Value::String("fee".to_owned())) + ); + assert_eq!( + object.get("tradeCandidate"), + Some(&serde_json::Value::Bool(false)) + ); + assert_eq!( + object.get("candleCandidate"), + Some(&serde_json::Value::Bool(false)) + ); + assert_eq!( + object.get("skipTradeReason"), + Some(&serde_json::Value::String("non_trade_event".to_owned())) + ); + } } diff --git a/kb_lib/src/dex_detect.rs b/kb_lib/src/dex_detect.rs index f8d5a4b..82a2fed 100644 --- a/kb_lib/src/dex_detect.rs +++ b/kb_lib/src/dex_detect.rs @@ -106,6 +106,18 @@ impl KbDexDetectService { }; detection_results.push(detect_result); } + if decoded_event.protocol_name == "raydium_clmm" + && decoded_event.event_kind == "raydium_clmm.swap_v2" + { + let detect_result = self + .detect_raydium_clmm_trade(&transaction, decoded_event) + .await; + let detect_result = match detect_result { + Ok(detect_result) => detect_result, + Err(error) => return Err(error), + }; + detection_results.push(detect_result); + } if decoded_event.protocol_name == "pump_fun" && decoded_event.event_kind == "pump_fun.create_v2_token" { @@ -2747,6 +2759,214 @@ impl KbDexDetectService { created_listing, }) } + + async fn detect_raydium_clmm_trade( + &self, + transaction: &crate::KbChainTransactionDto, + decoded_event: &crate::KbDexDecodedEventDto, + ) -> Result { + let decoded_event_id = match decoded_event.id { + Some(decoded_event_id) => decoded_event_id, + None => { + return Err(crate::KbError::InvalidState( + "decoded dex event has no internal id".to_string(), + )); + } + }; + let dex_id_result = self.ensure_raydium_clmm_dex().await; + let dex_id = match dex_id_result { + Ok(dex_id) => dex_id, + Err(error) => return Err(error), + }; + let pool_address = match decoded_event.pool_account.clone() { + Some(pool_address) => pool_address, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no pool_account", + decoded_event_id + ))); + } + }; + let base_mint = match decoded_event.token_a_mint.clone() { + Some(base_mint) => base_mint, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no token_a_mint", + decoded_event_id + ))); + } + }; + let quote_mint = match decoded_event.token_b_mint.clone() { + Some(quote_mint) => quote_mint, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no token_b_mint", + decoded_event_id + ))); + } + }; + let payload_value_result = kb_parse_payload_json(decoded_event.payload_json.as_str()); + let payload_value = match payload_value_result { + Ok(payload_value) => payload_value, + Err(error) => return Err(error), + }; + let base_vault_address = kb_extract_payload_string_field(&payload_value, "base_vault"); + let quote_vault_address = kb_extract_payload_string_field(&payload_value, "quote_vault"); + let base_token_id_result = self.ensure_token(base_mint.as_str()).await; + let base_token_id = match base_token_id_result { + Ok(base_token_id) => base_token_id, + Err(error) => return Err(error), + }; + let quote_token_id_result = self.ensure_token(quote_mint.as_str()).await; + let quote_token_id = match quote_token_id_result { + Ok(quote_token_id) => quote_token_id, + Err(error) => return Err(error), + }; + let existing_pool_result = + crate::get_pool_by_address(self.database.as_ref(), pool_address.as_str()).await; + let existing_pool_option = match existing_pool_result { + Ok(existing_pool_option) => existing_pool_option, + Err(error) => return Err(error), + }; + let created_pool = existing_pool_option.is_none(); + let pool_id = match existing_pool_option { + Some(pool) => match pool.id { + Some(pool_id) => pool_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "pool '{}' has no internal id", + pool.address + ))); + } + }, + None => { + let pool_dto = crate::KbPoolDto::new( + dex_id, + pool_address.clone(), + crate::KbPoolKind::Clmm, + crate::KbPoolStatus::Active, + ); + let upsert_result = crate::upsert_pool(self.database.as_ref(), &pool_dto).await; + match upsert_result { + Ok(pool_id) => pool_id, + Err(error) => return Err(error), + } + } + }; + let existing_pair_result = + crate::get_pair_by_pool_id(self.database.as_ref(), pool_id).await; + let existing_pair_option = match existing_pair_result { + Ok(existing_pair_option) => existing_pair_option, + Err(error) => return Err(error), + }; + + let created_pair = existing_pair_option.is_none(); + let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str()); + + let pair_dto = + crate::KbPairDto::new(dex_id, pool_id, base_token_id, quote_token_id, pair_symbol); + let pair_id_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await; + let pair_id = match pair_id_result { + Ok(pair_id) => pair_id, + Err(error) => return Err(error), + }; + let upsert_base_pool_token_result = crate::upsert_pool_token( + self.database.as_ref(), + &crate::KbPoolTokenDto::new( + pool_id, + base_token_id, + crate::KbPoolTokenRole::Base, + base_vault_address, + Some(0), + ), + ) + .await; + if let Err(error) = upsert_base_pool_token_result { + return Err(error); + } + let upsert_quote_pool_token_result = crate::upsert_pool_token( + self.database.as_ref(), + &crate::KbPoolTokenDto::new( + pool_id, + quote_token_id, + crate::KbPoolTokenRole::Quote, + quote_vault_address, + Some(1), + ), + ) + .await; + if let Err(error) = upsert_quote_pool_token_result { + return Err(error); + } + let existing_listing_result = + crate::get_pool_listing_by_pool_id(self.database.as_ref(), pool_id).await; + let existing_listing_option = match existing_listing_result { + Ok(existing_listing_option) => existing_listing_option, + Err(error) => return Err(error), + }; + let created_listing = existing_listing_option.is_none(); + let pool_listing_id = match existing_listing_option { + Some(pool_listing) => pool_listing.id, + None => { + let listing_id_result = self + .upsert_pool_listing_from_decoded_event(dex_id, pool_id, pair_id, transaction) + .await; + match listing_id_result { + Ok(listing_id) => Some(listing_id), + Err(error) => return Err(error), + } + } + }; + if created_pool { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.raydium_clmm.new_pool", + crate::KbAnalysisSignalSeverity::Low, + payload_value.clone(), + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + if created_pair { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.raydium_clmm.new_pair", + crate::KbAnalysisSignalSeverity::Low, + payload_value.clone(), + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + if created_listing { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.raydium_clmm.first_listing_seen", + crate::KbAnalysisSignalSeverity::Low, + payload_value, + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + Ok(crate::KbDexPoolDetectionResult { + decoded_event_id, + dex_id, + pool_id, + pair_id, + pool_listing_id, + created_pool, + created_pair, + created_listing, + }) + } async fn detect_raydium_cpmm_trade( &self, @@ -2980,6 +3200,32 @@ impl KbDexDetectService { } } + async fn ensure_raydium_clmm_dex(&self) -> Result { + let dex_result = crate::get_dex_by_code(self.database.as_ref(), "raydium_clmm").await; + let dex_option = match dex_result { + Ok(dex_option) => dex_option, + Err(error) => return Err(error), + }; + match dex_option { + Some(dex) => match dex.id { + Some(dex_id) => Ok(dex_id), + None => Err(crate::KbError::InvalidState( + "raydium_clmm dex has no internal id".to_string(), + )), + }, + None => { + let dex_dto = crate::KbDexDto::new( + "raydium_clmm".to_string(), + "Raydium CLMM".to_string(), + Some(crate::KB_RAYDIUM_CLMM_PROGRAM_ID.to_string()), + None, + true, + ); + crate::upsert_dex(self.database.as_ref(), &dex_dto).await + } + } + } + async fn ensure_dexlab_dex(&self) -> Result { let dex_result = crate::get_dex_by_code(self.database.as_ref(), "dexlab").await; let dex_option = match dex_result { diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index 085e46d..c442ca9 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -236,6 +236,7 @@ pub use dex::KB_ORCA_WHIRLPOOLS_PROGRAM_ID; pub use dex::KB_PUMP_FUN_PROGRAM_ID; pub use dex::KB_PUMP_SWAP_PROGRAM_ID; pub use dex::KB_RAYDIUM_AMM_V4_PROGRAM_ID; +pub use dex::KB_RAYDIUM_CLMM_PROGRAM_ID; pub use dex::KB_RAYDIUM_CPMM_PROGRAM_ID; pub use dex::KbDexlabCreatePoolDecoded; pub use dex::KbDexlabDecodedEvent; @@ -271,9 +272,12 @@ pub use dex::KbPumpSwapTradeDecoded; pub use dex::KbRaydiumAmmV4DecodedEvent; pub use dex::KbRaydiumAmmV4Decoder; pub use dex::KbRaydiumAmmV4Initialize2PoolDecoded; +pub use dex::KbRaydiumClmmDecodedEvent; +pub use dex::KbRaydiumClmmSwapV2Decoded; pub use dex::KbRaydiumCpmmDecodedEvent; pub use dex::KbRaydiumCpmmSwapDecoded; pub use dex::KbRaydiumCpmmSwapMode; +pub use dex::kb_decode_raydium_clmm_instruction; pub use dex::kb_decode_raydium_cpmm_instruction; pub use dex_decode::KbDexDecodeService; pub use dex_detect::KbDexDetectService; diff --git a/kb_lib/src/pair_candle_aggregation.rs b/kb_lib/src/pair_candle_aggregation.rs index 78e137e..e4c1688 100644 --- a/kb_lib/src/pair_candle_aggregation.rs +++ b/kb_lib/src/pair_candle_aggregation.rs @@ -80,14 +80,23 @@ impl KbPairCandleAggregationService { let mut seen = std::collections::HashSet::<(i64, i64, i64)>::new(); let mut results = std::vec::Vec::new(); for trade_event in &trade_events { - let event_time_option = - kb_extract_trade_event_unix_time(self.database.as_ref(), trade_event).await?; + let event_time_option_result = + kb_extract_trade_event_unix_time(self.database.as_ref(), trade_event).await; + let event_time_option = match event_time_option_result { + Ok(event_time_option) => event_time_option, + Err(error) => return Err(error), + }; let event_time_unix = match event_time_option { Some(event_time_unix) => event_time_unix, None => continue, }; for timeframe_seconds in &materialized_timeframes { - let bucket_start_unix = kb_bucket_start_unix(event_time_unix, *timeframe_seconds)?; + let bucket_start_unix_result = + kb_bucket_start_unix(event_time_unix, *timeframe_seconds); + let bucket_start_unix = match bucket_start_unix_result { + Ok(bucket_start_unix) => bucket_start_unix, + Err(error) => return Err(error), + }; let dedupe_key = (trade_event.pair_id, *timeframe_seconds, bucket_start_unix); if seen.contains(&dedupe_key) { continue; @@ -199,7 +208,12 @@ pub(crate) async fn kb_build_candle_from_trade_events( if trade_event.pair_id != pair_id { continue; } - let event_time_option = kb_extract_trade_event_unix_time(database, trade_event).await?; + let event_time_option_result = + kb_extract_trade_event_unix_time(database, trade_event).await; + let event_time_option = match event_time_option_result { + Ok(event_time_option) => event_time_option, + Err(error) => return Err(error), + }; let event_time_unix = match event_time_option { Some(event_time_unix) => event_time_unix, None => continue, diff --git a/kb_lib/src/trade_aggregation.rs b/kb_lib/src/trade_aggregation.rs index eb4bf62..8b38571 100644 --- a/kb_lib/src/trade_aggregation.rs +++ b/kb_lib/src/trade_aggregation.rs @@ -150,12 +150,34 @@ impl KbTradeAggregationService { let payload = match payload_result { Ok(payload) => payload, Err(error) => { - return Err(crate::KbError::Json(format!( - "cannot parse decoded_event payload_json '{}': {}", - decoded_event.payload_json, error - ))); + tracing::warn!( + event_kind = %decoded_event.event_kind, + pool_account = ?decoded_event.pool_account, + decoded_event_id = ?decoded_event.id, + error = %error, + "skipping decoded event with invalid payload_json" + ); + continue; } }; + if !kb_is_decoded_event_trade_candidate(decoded_event.event_kind.as_str(), &payload) { + tracing::debug!( + event_kind = %decoded_event.event_kind, + pool_account = ?decoded_event.pool_account, + decoded_event_id = ?decoded_event.id, + "skipping non-trade decoded event" + ); + continue; + } + if !kb_is_decoded_event_candle_candidate(decoded_event.event_kind.as_str(), &payload) { + tracing::debug!( + event_kind = %decoded_event.event_kind, + pool_account = ?decoded_event.pool_account, + decoded_event_id = ?decoded_event.id, + "skipping non-candle decoded trade candidate" + ); + continue; + } let trade_side = kb_extract_trade_side(decoded_event.event_kind.as_str(), &payload); let mut base_amount_raw = kb_extract_amount_string( &payload, @@ -253,6 +275,31 @@ impl KbTradeAggregationService { price_quote_per_base = inferred.2; } } + if decoded_event.event_kind.starts_with("raydium_clmm.") + && (base_amount_raw.is_none() + || quote_amount_raw.is_none() + || price_quote_per_base.is_none()) + { + let inferred_result = kb_extract_trade_amounts_from_vault_balance_deltas( + transaction.transaction_json.as_str(), + transaction.meta_json.as_deref(), + base_vault_address.as_deref(), + quote_vault_address.as_deref(), + ); + let inferred = match inferred_result { + Ok(inferred) => inferred, + Err(error) => return Err(error), + }; + if base_amount_raw.is_none() { + base_amount_raw = inferred.0; + } + if quote_amount_raw.is_none() { + quote_amount_raw = inferred.1; + } + if price_quote_per_base.is_none() { + price_quote_per_base = inferred.2; + } + } if price_quote_per_base.is_none() { price_quote_per_base = kb_compute_price_quote_per_base_with_decimals( transaction.meta_json.as_deref(), @@ -267,6 +314,23 @@ impl KbTradeAggregationService { quote_amount_raw.as_deref(), ); } + if !kb_is_priced_trade_event( + base_amount_raw.as_deref(), + quote_amount_raw.as_deref(), + price_quote_per_base, + ) { + tracing::debug!( + event_kind = %decoded_event.event_kind, + pool_account = ?decoded_event.pool_account, + decoded_event_id = ?decoded_event.id, + transaction_signature = %transaction.signature, + base_amount_raw = ?base_amount_raw, + quote_amount_raw = ?quote_amount_raw, + price_quote_per_base = ?price_quote_per_base, + "skipping unpriced trade aggregation candidate" + ); + continue; + } let slot_i64 = kb_convert_slot_to_i64(transaction.slot); let created_trade_event = existing_trade_option.is_none(); let trade_event_dto = crate::KbTradeEventDto::new( @@ -404,6 +468,123 @@ impl KbTradeAggregationService { } } +fn kb_is_decoded_event_trade_candidate(event_kind: &str, payload: &serde_json::Value) -> bool { + let trade_candidate_option = kb_extract_top_level_bool_by_candidate_keys( + payload, + &["tradeCandidate", "trade_candidate"], + ); + if let Some(trade_candidate) = trade_candidate_option { + return trade_candidate; + } + let event_category_option = + kb_extract_string_by_candidate_keys(payload, &["eventCategory", "event_category"]); + if let Some(event_category) = event_category_option { + return event_category.as_str() == "trade"; + } + kb_is_trade_event_kind(event_kind) +} + +fn kb_is_decoded_event_candle_candidate(event_kind: &str, payload: &serde_json::Value) -> bool { + let candle_candidate_option = kb_extract_top_level_bool_by_candidate_keys( + payload, + &["candleCandidate", "candle_candidate"], + ); + if let Some(candle_candidate) = candle_candidate_option { + return candle_candidate; + } + if !kb_is_decoded_event_trade_candidate(event_kind, payload) { + return false; + } + kb_is_trade_event_kind(event_kind) +} + +fn kb_extract_top_level_bool_by_candidate_keys( + payload: &serde_json::Value, + candidate_keys: &[&str], +) -> std::option::Option { + let object = match payload.as_object() { + Some(object) => object, + None => return None, + }; + for candidate_key in candidate_keys { + let value_option = object.get(*candidate_key); + let value = match value_option { + Some(value) => value, + None => continue, + }; + if let Some(value_bool) = value.as_bool() { + return Some(value_bool); + } + if let Some(value_i64) = value.as_i64() { + return Some(value_i64 != 0); + } + if let Some(value_u64) = value.as_u64() { + return Some(value_u64 != 0); + } + if let Some(value_text) = value.as_str() { + let normalized = value_text.trim().to_ascii_lowercase(); + if normalized.as_str() == "true" { + return Some(true); + } + if normalized.as_str() == "false" { + return Some(false); + } + if normalized.as_str() == "1" { + return Some(true); + } + if normalized.as_str() == "0" { + return Some(false); + } + } + } + None +} + +fn kb_is_priced_trade_event( + base_amount_raw: std::option::Option<&str>, + quote_amount_raw: std::option::Option<&str>, + price_quote_per_base: std::option::Option, +) -> bool { + let base_amount_raw = match base_amount_raw { + Some(base_amount_raw) => base_amount_raw.trim(), + None => return false, + }; + if base_amount_raw.is_empty() { + return false; + } + let base_amount_result = base_amount_raw.parse::(); + let base_amount = match base_amount_result { + Ok(base_amount) => base_amount, + Err(_) => return false, + }; + if base_amount <= 0 { + return false; + } + let quote_amount_raw = match quote_amount_raw { + Some(quote_amount_raw) => quote_amount_raw.trim(), + None => return false, + }; + if quote_amount_raw.is_empty() { + return false; + } + let quote_amount_result = quote_amount_raw.parse::(); + let quote_amount = match quote_amount_result { + Ok(quote_amount) => quote_amount, + Err(_) => return false, + }; + if quote_amount <= 0 { + return false; + } + let price = match price_quote_per_base { + Some(price) => price, + None => return false, + }; + if !price.is_finite() { + return false; + } + price > 0.0 +} + fn kb_is_trade_event_kind(event_kind: &str) -> bool { if event_kind.ends_with(".swap") { return true; @@ -420,6 +601,18 @@ fn kb_is_trade_event_kind(event_kind: &str) -> bool { if event_kind == "raydium_cpmm.swap_base_output" { return true; } + if event_kind == "raydium_clmm.swap_v2" { + return true; + } + if event_kind == "raydium_clmm.swap_router_base_in" { + return true; + } + if event_kind == "raydium_clmm.swap_router_base_out" { + return true; + } + if event_kind == "raydium_clmm.exact_output" { + return true; + } false } @@ -1240,4 +1433,30 @@ mod tests { }; assert_eq!(pair_metric.trade_count, 1); } + + #[test] + fn kb_is_priced_trade_event_rejects_unpriced_values() { + let result = super::kb_is_priced_trade_event(None, Some("2500"), Some(2.5)); + assert!(!result); + let result = super::kb_is_priced_trade_event(Some("1000"), None, Some(2.5)); + assert!(!result); + let result = super::kb_is_priced_trade_event(Some("1000"), Some("2500"), None); + assert!(!result); + let result = super::kb_is_priced_trade_event(Some("0"), Some("2500"), Some(2.5)); + assert!(!result); + let result = super::kb_is_priced_trade_event(Some("1000"), Some("0"), Some(2.5)); + assert!(!result); + let result = super::kb_is_priced_trade_event(Some("-1"), Some("2500"), Some(2.5)); + assert!(!result); + let result = super::kb_is_priced_trade_event(Some("1000"), Some("-1"), Some(2.5)); + assert!(!result); + let result = super::kb_is_priced_trade_event(Some("abc"), Some("2500"), Some(2.5)); + assert!(!result); + let result = super::kb_is_priced_trade_event(Some("1000"), Some("abc"), Some(2.5)); + assert!(!result); + let result = super::kb_is_priced_trade_event(Some("1000"), Some("2500"), Some(0.0)); + assert!(!result); + let result = super::kb_is_priced_trade_event(Some("1000"), Some("2500"), Some(f64::NAN)); + assert!(!result); + } }