diff --git a/kb_lib-v0.7.24-pre.3.zip b/kb_lib-v0.7.24-pre.3.zip new file mode 100644 index 0000000..9fe9631 Binary files /dev/null and b/kb_lib-v0.7.24-pre.3.zip differ diff --git a/kb_lib/src/dex.rs b/kb_lib/src/dex.rs index ada1e88..44a635f 100644 --- a/kb_lib/src/dex.rs +++ b/kb_lib/src/dex.rs @@ -11,47 +11,53 @@ mod orca_whirlpools; mod pump_fun; mod pump_swap; mod raydium_amm_v4; +mod raydium_cpmm; +pub use dexlab::KB_DEXLAB_PROGRAM_ID; pub use dexlab::KbDexlabCreatePoolDecoded; pub use dexlab::KbDexlabDecodedEvent; pub use dexlab::KbDexlabDecoder; pub use dexlab::KbDexlabSwapDecoded; -pub use dexlab::KB_DEXLAB_PROGRAM_ID; +pub use fluxbeam::KB_FLUXBEAM_PROGRAM_ID; pub use fluxbeam::KbFluxbeamCreatePoolDecoded; pub use fluxbeam::KbFluxbeamDecodedEvent; pub use fluxbeam::KbFluxbeamDecoder; pub use fluxbeam::KbFluxbeamSwapDecoded; -pub use fluxbeam::KB_FLUXBEAM_PROGRAM_ID; +pub use meteora_damm_v1::KB_METEORA_DAMM_V1_PROGRAM_ID; pub use meteora_damm_v1::KbMeteoraDammV1CreatePoolDecoded; pub use meteora_damm_v1::KbMeteoraDammV1DecodedEvent; pub use meteora_damm_v1::KbMeteoraDammV1Decoder; pub use meteora_damm_v1::KbMeteoraDammV1SwapDecoded; -pub use meteora_damm_v1::KB_METEORA_DAMM_V1_PROGRAM_ID; +pub use meteora_damm_v2::KB_METEORA_DAMM_V2_PROGRAM_ID; pub use meteora_damm_v2::KbMeteoraDammV2CreatePoolDecoded; pub use meteora_damm_v2::KbMeteoraDammV2DecodedEvent; pub use meteora_damm_v2::KbMeteoraDammV2Decoder; pub use meteora_damm_v2::KbMeteoraDammV2SwapDecoded; -pub use meteora_damm_v2::KB_METEORA_DAMM_V2_PROGRAM_ID; +pub use meteora_dbc::KB_METEORA_DBC_PROGRAM_ID; pub use meteora_dbc::KbMeteoraDbcCreatePoolDecoded; pub use meteora_dbc::KbMeteoraDbcDecodedEvent; pub use meteora_dbc::KbMeteoraDbcDecoder; pub use meteora_dbc::KbMeteoraDbcSwapDecoded; -pub use meteora_dbc::KB_METEORA_DBC_PROGRAM_ID; +pub use orca_whirlpools::KB_ORCA_WHIRLPOOLS_PROGRAM_ID; pub use orca_whirlpools::KbOrcaWhirlpoolsCreatePoolDecoded; pub use orca_whirlpools::KbOrcaWhirlpoolsDecodedEvent; pub use orca_whirlpools::KbOrcaWhirlpoolsDecoder; pub use orca_whirlpools::KbOrcaWhirlpoolsSwapDecoded; -pub use orca_whirlpools::KB_ORCA_WHIRLPOOLS_PROGRAM_ID; +pub use pump_fun::KB_PUMP_FUN_PROGRAM_ID; pub use pump_fun::KbPumpFunCreateV2TokenDecoded; pub use pump_fun::KbPumpFunDecodedEvent; pub use pump_fun::KbPumpFunDecoder; pub use pump_fun::KbPumpFunTradeDecoded; -pub use pump_fun::KB_PUMP_FUN_PROGRAM_ID; +pub use pump_swap::KB_PUMP_SWAP_PROGRAM_ID; pub use pump_swap::KbPumpSwapDecodedEvent; pub use pump_swap::KbPumpSwapDecoder; pub use pump_swap::KbPumpSwapTradeDecoded; -pub use pump_swap::KB_PUMP_SWAP_PROGRAM_ID; +pub use raydium_amm_v4::KB_RAYDIUM_AMM_V4_PROGRAM_ID; pub use raydium_amm_v4::KbRaydiumAmmV4DecodedEvent; pub use raydium_amm_v4::KbRaydiumAmmV4Decoder; pub use raydium_amm_v4::KbRaydiumAmmV4Initialize2PoolDecoded; -pub use raydium_amm_v4::KB_RAYDIUM_AMM_V4_PROGRAM_ID; +pub use raydium_cpmm::KB_RAYDIUM_CPMM_PROGRAM_ID; +pub use raydium_cpmm::KbRaydiumCpmmDecodedEvent; +pub use raydium_cpmm::KbRaydiumCpmmSwapDecoded; +pub use raydium_cpmm::KbRaydiumCpmmSwapMode; +pub use raydium_cpmm::kb_decode_raydium_cpmm_instruction; diff --git a/kb_lib/src/dex/raydium_cpmm.rs b/kb_lib/src/dex/raydium_cpmm.rs new file mode 100644 index 0000000..cbe09e5 --- /dev/null +++ b/kb_lib/src/dex/raydium_cpmm.rs @@ -0,0 +1,473 @@ +// file: kb_lib/src/dex/raydium_cpmm.rs + +//! Raydium CPMM decoder. +//! +//! This module decodes Raydium constant product swap instructions from +//! already-projected Solana transaction instructions. + +/// Raydium CPMM mainnet program id. +pub const KB_RAYDIUM_CPMM_PROGRAM_ID: &str = "CPMMoo8L3F4NbTegBCKVNunggL7H1ZpdTHKxQB5qKP1C"; + +/// Raydium CPMM `swap_base_input` discriminator. +const KB_RAYDIUM_CPMM_SWAP_BASE_INPUT_DISCRIMINATOR: [u8; 8] = + [143, 190, 90, 218, 196, 30, 51, 222]; + +/// Raydium CPMM `swap_base_output` discriminator. +const KB_RAYDIUM_CPMM_SWAP_BASE_OUTPUT_DISCRIMINATOR: [u8; 8] = + [55, 217, 98, 86, 163, 74, 180, 173]; + +/// Raydium CPMM decoded event. +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +pub enum KbRaydiumCpmmDecodedEvent { + /// Swap where the user fixes the input amount. + SwapBaseInput(KbRaydiumCpmmSwapDecoded), + /// Swap where the user fixes the output amount. + SwapBaseOutput(KbRaydiumCpmmSwapDecoded), +} + +/// Raydium CPMM swap mode. +#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +pub enum KbRaydiumCpmmSwapMode { + /// Fixed input swap. + BaseInput, + /// Fixed output swap. + BaseOutput, +} + +/// Raydium CPMM decoded swap. +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +pub struct KbRaydiumCpmmSwapDecoded { + /// Instruction mode. + pub swap_mode: KbRaydiumCpmmSwapMode, + /// User or payer account. + pub payer: std::string::String, + /// Raydium authority account. + pub authority: std::string::String, + /// AMM config account. + pub amm_config: std::string::String, + /// CPMM pool state account. + pub pool_state: std::string::String, + /// User input token account. + pub input_token_account: std::string::String, + /// User output token account. + pub output_token_account: std::string::String, + /// Pool input vault. + pub input_vault: std::string::String, + /// Pool output vault. + pub output_vault: std::string::String, + /// Input token program. + pub input_token_program: std::string::String, + /// Output token program. + pub output_token_program: std::string::String, + /// Input token mint. + pub input_token_mint: std::string::String, + /// Output token mint. + pub output_token_mint: std::string::String, + /// Observation state. + pub observation_state: std::string::String, + /// Normalized base mint. + pub base_mint: std::string::String, + /// Normalized quote mint. + pub quote_mint: std::string::String, + /// Normalized base vault. + pub base_vault: std::string::String, + /// Normalized quote vault. + pub quote_vault: std::string::String, + /// True when input mint is the normalized base mint. + pub input_is_base: bool, + /// Trade side from the normalized pair perspective. + pub trade_side: std::string::String, + /// Fixed input amount, for swap_base_input. + pub amount_in_raw: std::option::Option, + /// Minimum output amount, for swap_base_input. + pub minimum_amount_out_raw: std::option::Option, + /// Maximum input amount, for swap_base_output. + pub max_amount_in_raw: std::option::Option, + /// Fixed output amount, for swap_base_output. + pub amount_out_raw: std::option::Option, +} + +impl KbRaydiumCpmmDecodedEvent { + /// Returns the storage event kind. + pub fn event_kind(&self) -> &'static str { + match self { + KbRaydiumCpmmDecodedEvent::SwapBaseInput(_) => "raydium_cpmm.swap_base_input", + KbRaydiumCpmmDecodedEvent::SwapBaseOutput(_) => "raydium_cpmm.swap_base_output", + } + } + + /// Returns the pool account. + pub fn pool_account(&self) -> &str { + match self { + KbRaydiumCpmmDecodedEvent::SwapBaseInput(event) => event.pool_state.as_str(), + KbRaydiumCpmmDecodedEvent::SwapBaseOutput(event) => event.pool_state.as_str(), + } + } + + /// Returns the normalized base mint. + pub fn base_mint(&self) -> &str { + match self { + KbRaydiumCpmmDecodedEvent::SwapBaseInput(event) => event.base_mint.as_str(), + KbRaydiumCpmmDecodedEvent::SwapBaseOutput(event) => event.base_mint.as_str(), + } + } + + /// Returns the normalized quote mint. + pub fn quote_mint(&self) -> &str { + match self { + KbRaydiumCpmmDecodedEvent::SwapBaseInput(event) => event.quote_mint.as_str(), + KbRaydiumCpmmDecodedEvent::SwapBaseOutput(event) => event.quote_mint.as_str(), + } + } + + /// Converts the decoded event to JSON payload. + pub fn to_payload_json(&self) -> std::option::Option { + match self { + crate::KbRaydiumCpmmDecodedEvent::SwapBaseInput(event) => { + let result = serde_json::to_string(event); + match result { + Ok(payload) => Some(payload), + Err(_) => None, + } + } + crate::KbRaydiumCpmmDecodedEvent::SwapBaseOutput(event) => { + let result = serde_json::to_string(event); + match result { + Ok(payload) => Some(payload), + Err(_) => None, + } + } + } + } +} + +/// Decodes one Raydium CPMM instruction from projected instruction fields. +pub fn kb_decode_raydium_cpmm_instruction( + accounts_json: &str, + data_json: &str, +) -> std::vec::Vec { + let accounts = match kb_parse_accounts_json(accounts_json) { + Some(accounts) => accounts, + None => return std::vec::Vec::new(), + }; + let data_base58 = match kb_parse_data_json_as_base58(data_json) { + Some(data_base58) => data_base58, + None => return std::vec::Vec::new(), + }; + let data = match bs58::decode(data_base58.as_str()).into_vec() { + Ok(data) => data, + Err(_) => return std::vec::Vec::new(), + }; + if data.len() < 24 { + return std::vec::Vec::new(); + } + let discriminator = [ + data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7], + ]; + if discriminator == KB_RAYDIUM_CPMM_SWAP_BASE_INPUT_DISCRIMINATOR { + let amount_in = match kb_read_u64_le(data.as_slice(), 8) { + Some(value) => value, + None => return std::vec::Vec::new(), + }; + let minimum_amount_out = match kb_read_u64_le(data.as_slice(), 16) { + Some(value) => value, + None => return std::vec::Vec::new(), + }; + let swap = match kb_build_raydium_cpmm_swap( + KbRaydiumCpmmSwapMode::BaseInput, + accounts.as_slice(), + Some(amount_in.to_string()), + Some(minimum_amount_out.to_string()), + None, + None, + ) { + Some(swap) => swap, + None => return std::vec::Vec::new(), + }; + return vec![KbRaydiumCpmmDecodedEvent::SwapBaseInput(swap)]; + } + if discriminator == KB_RAYDIUM_CPMM_SWAP_BASE_OUTPUT_DISCRIMINATOR { + let max_amount_in = match kb_read_u64_le(data.as_slice(), 8) { + Some(value) => value, + None => return std::vec::Vec::new(), + }; + let amount_out = match kb_read_u64_le(data.as_slice(), 16) { + Some(value) => value, + None => return std::vec::Vec::new(), + }; + let swap = match kb_build_raydium_cpmm_swap( + KbRaydiumCpmmSwapMode::BaseOutput, + accounts.as_slice(), + None, + None, + Some(max_amount_in.to_string()), + Some(amount_out.to_string()), + ) { + Some(swap) => swap, + None => return std::vec::Vec::new(), + }; + return vec![KbRaydiumCpmmDecodedEvent::SwapBaseOutput(swap)]; + } + std::vec::Vec::new() +} + +fn kb_build_raydium_cpmm_swap( + swap_mode: KbRaydiumCpmmSwapMode, + accounts: &[std::string::String], + amount_in_raw: std::option::Option, + minimum_amount_out_raw: std::option::Option, + max_amount_in_raw: std::option::Option, + amount_out_raw: std::option::Option, +) -> std::option::Option { + if accounts.len() < 13 { + return None; + } + let input_token_mint = accounts[10].clone(); + let output_token_mint = accounts[11].clone(); + let input_vault = accounts[6].clone(); + let output_vault = accounts[7].clone(); + let normalized = kb_normalize_raydium_cpmm_pair( + input_token_mint.as_str(), + output_token_mint.as_str(), + input_vault.as_str(), + output_vault.as_str(), + ); + let input_is_base = normalized.input_is_base; + let trade_side = if input_is_base { + "sell".to_string() + } else { + "buy".to_string() + }; + Some(KbRaydiumCpmmSwapDecoded { + swap_mode, + payer: accounts[0].clone(), + authority: accounts[1].clone(), + amm_config: accounts[2].clone(), + pool_state: accounts[3].clone(), + input_token_account: accounts[4].clone(), + output_token_account: accounts[5].clone(), + input_vault, + output_vault, + input_token_program: accounts[8].clone(), + output_token_program: accounts[9].clone(), + input_token_mint, + output_token_mint, + observation_state: accounts[12].clone(), + base_mint: normalized.base_mint, + quote_mint: normalized.quote_mint, + base_vault: normalized.base_vault, + quote_vault: normalized.quote_vault, + input_is_base, + trade_side, + amount_in_raw, + minimum_amount_out_raw, + max_amount_in_raw, + amount_out_raw, + }) +} + +struct KbRaydiumCpmmNormalizedPair { + base_mint: std::string::String, + quote_mint: std::string::String, + base_vault: std::string::String, + quote_vault: std::string::String, + input_is_base: bool, +} + +fn kb_normalize_raydium_cpmm_pair( + input_mint: &str, + output_mint: &str, + input_vault: &str, + output_vault: &str, +) -> KbRaydiumCpmmNormalizedPair { + if kb_is_quote_mint(output_mint) && !kb_is_quote_mint(input_mint) { + return KbRaydiumCpmmNormalizedPair { + base_mint: input_mint.to_string(), + quote_mint: output_mint.to_string(), + base_vault: input_vault.to_string(), + quote_vault: output_vault.to_string(), + input_is_base: true, + }; + } + if kb_is_quote_mint(input_mint) && !kb_is_quote_mint(output_mint) { + return KbRaydiumCpmmNormalizedPair { + base_mint: output_mint.to_string(), + quote_mint: input_mint.to_string(), + base_vault: output_vault.to_string(), + quote_vault: input_vault.to_string(), + input_is_base: false, + }; + } + if input_mint <= output_mint { + return KbRaydiumCpmmNormalizedPair { + base_mint: input_mint.to_string(), + quote_mint: output_mint.to_string(), + base_vault: input_vault.to_string(), + quote_vault: output_vault.to_string(), + input_is_base: true, + }; + } + KbRaydiumCpmmNormalizedPair { + base_mint: output_mint.to_string(), + quote_mint: input_mint.to_string(), + base_vault: output_vault.to_string(), + quote_vault: input_vault.to_string(), + input_is_base: false, + } +} + +fn kb_is_quote_mint(mint: &str) -> bool { + if mint == "So11111111111111111111111111111111111111112" { + return true; + } + if mint == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v" { + return true; + } + if mint == "Es9vMFrzaCERmJfrF4H2FYD4KMZbsJyNeYrBbqD6CbK" { + return true; + } + if mint == "USD1ttGY1N17NEEHLmELoaybftRBUSErhqYiQzvEmuB" { + return true; + } + false +} + +fn kb_parse_accounts_json( + accounts_json: &str, +) -> std::option::Option> { + let result = serde_json::from_str::>(accounts_json); + match result { + Ok(accounts) => Some(accounts), + Err(_) => None, + } +} + +fn kb_parse_data_json_as_base58(data_json: &str) -> std::option::Option { + let json_string_result = serde_json::from_str::(data_json); + match json_string_result { + Ok(value) => return Some(value), + Err(_) => {} + } + let trimmed = data_json.trim(); + if trimmed.is_empty() { + return None; + } + let without_quotes = trimmed.trim_matches('"'); + if without_quotes.is_empty() { + return None; + } + Some(without_quotes.to_string()) +} + +fn kb_read_u64_le(data: &[u8], offset: usize) -> std::option::Option { + if data.len() < offset + 8 { + return None; + } + let bytes = [ + data[offset], + data[offset + 1], + data[offset + 2], + data[offset + 3], + data[offset + 4], + data[offset + 5], + data[offset + 6], + data[offset + 7], + ]; + Some(u64::from_le_bytes(bytes)) +} + +#[cfg(test)] +mod tests { + #[test] + fn decodes_swap_base_input() { + let accounts_json = r#"[ + "ARu4n5mFdZogZAravu7CcizaojWnS6oqka37gdLT5SZn", + "GpMZbSM2GgvTKHJirzeGfMFoaZ8UR2X7F4v8vHTvxFbL", + "GtdipqAcw8eAYxGcANs3vpN7UaN7UH7u89Kd8wPqLThd", + "2ErXvV1tKtG3wiHqdofDjMou7Jusdsfasvfh8HrTj5oV", + "3tBz6MmUTaf2QdnAnYYJGDpgbTXdANP9v4CtUL42GZaZ", + "C1qMskwQhQ4cn1M6RG8uWfUuMrq6ysRPEHccGxk4tjw8", + "8aHJfDQBmngBpFwLysD1bhj8LKh7ywbxfUxsy7VQkx22", + "A4jPLDGrkAogPgm8KqQzZQ4FoNJuKuKMCbNqgQkpT5Ci", + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA", + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA", + "Pf9aSicGu3g6tTUBqrRbjNsGape9HopibspX5KSbonk", + "USD1ttGY1N17NEEHLmELoaybftRBUSErhqYiQzvEmuB", + "9fzQ17bEnqJSsyHL5CodJqY2sdjZJBQCQLbgFZ1BYTWn" + ]"#; + let events = crate::kb_decode_raydium_cpmm_instruction( + accounts_json, + r#""E73fXHPWvSRF4Q2ZQFvPoeJBVGDUEMmxB""#, + ); + assert_eq!(events.len(), 1); + match &events[0] { + crate::KbRaydiumCpmmDecodedEvent::SwapBaseInput(event) => { + assert_eq!( + event.pool_state, + "2ErXvV1tKtG3wiHqdofDjMou7Jusdsfasvfh8HrTj5oV" + ); + assert_eq!( + event.base_mint, + "Pf9aSicGu3g6tTUBqrRbjNsGape9HopibspX5KSbonk" + ); + assert_eq!( + event.quote_mint, + "USD1ttGY1N17NEEHLmELoaybftRBUSErhqYiQzvEmuB" + ); + assert_eq!(event.input_is_base, true); + assert_eq!(event.trade_side, "sell"); + assert_eq!(event.amount_in_raw.is_some(), true); + assert_eq!(event.minimum_amount_out_raw.is_some(), true); + } + _ => { + panic!("expected swap base input"); + } + } + } + #[test] + fn decodes_swap_base_output() { + let accounts_json = r#"[ + "JD6rVaerbyz6wjQ433nrw6bFTgFrp46MiYmi8EtUAfsG", + "GpMZbSM2GgvTKHJirzeGfMFoaZ8UR2X7F4v8vHTvxFbL", + "GtdipqAcw8eAYxGcANs3vpN7UaN7UH7u89Kd8wPqLThd", + "2ErXvV1tKtG3wiHqdofDjMou7Jusdsfasvfh8HrTj5oV", + "4Fg913oAKp7BiW6WvUF32RvACsBYusMTQ16nmA6Sd9tp", + "Cvcy56pbaDms71KyxAWFTQ24n5KbCy8KNa7zMhKGcJa6", + "A4jPLDGrkAogPgm8KqQzZQ4FoNJuKuKMCbNqgQkpT5Ci", + "8aHJfDQBmngBpFwLysD1bhj8LKh7ywbxfUxsy7VQkx22", + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA", + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA", + "USD1ttGY1N17NEEHLmELoaybftRBUSErhqYiQzvEmuB", + "Pf9aSicGu3g6tTUBqrRbjNsGape9HopibspX5KSbonk", + "9fzQ17bEnqJSsyHL5CodJqY2sdjZJBQCQLbgFZ1BYTWn" + ]"#; + let events = crate::kb_decode_raydium_cpmm_instruction( + accounts_json, + r#""66JafaVu7KNEtTngqQyD7ETVurF3rxJ47""#, + ); + assert_eq!(events.len(), 1); + match &events[0] { + crate::KbRaydiumCpmmDecodedEvent::SwapBaseOutput(event) => { + assert_eq!( + event.pool_state, + "2ErXvV1tKtG3wiHqdofDjMou7Jusdsfasvfh8HrTj5oV" + ); + assert_eq!( + event.base_mint, + "Pf9aSicGu3g6tTUBqrRbjNsGape9HopibspX5KSbonk" + ); + assert_eq!( + event.quote_mint, + "USD1ttGY1N17NEEHLmELoaybftRBUSErhqYiQzvEmuB" + ); + assert_eq!(event.input_is_base, false); + assert_eq!(event.trade_side, "buy"); + assert_eq!(event.max_amount_in_raw.is_some(), true); + assert_eq!(event.amount_out_raw.is_some(), true); + } + _ => { + panic!("expected swap base output"); + } + } + } +} diff --git a/kb_lib/src/dex_decode.rs b/kb_lib/src/dex_decode.rs index d550ac0..6e755c4 100644 --- a/kb_lib/src/dex_decode.rs +++ b/kb_lib/src/dex_decode.rs @@ -86,7 +86,7 @@ impl KbDexDecodeService { }; for decoded_event in &raydium_decoded { let persist_result = self - .persist_raydium_event(&transaction, decoded_event) + .persist_raydium_amm_v4_event(&transaction, decoded_event) .await; let persisted_event = match persist_result { Ok(persisted_event) => persisted_event, @@ -94,6 +94,16 @@ impl KbDexDecodeService { }; persisted.push(persisted_event); } + let raydium_cpmm_persisted_result = self + .decode_and_persist_raydium_cpmm_events(&transaction, &instructions) + .await; + let raydium_cpmm_persisted = match raydium_cpmm_persisted_result { + Ok(raydium_cpmm_persisted) => raydium_cpmm_persisted, + Err(error) => return Err(error), + }; + for persisted_event in raydium_cpmm_persisted { + persisted.push(persisted_event); + } let pump_fun_decoded_result = self .pump_fun_decoder .decode_transaction(&transaction, &instructions); @@ -1414,7 +1424,7 @@ impl KbDexDecodeService { } } - async fn persist_raydium_event( + async fn persist_raydium_amm_v4_event( &self, transaction: &crate::KbChainTransactionDto, decoded_event: &crate::KbRaydiumAmmV4DecodedEvent, @@ -1517,6 +1527,168 @@ impl KbDexDecodeService { } } + async fn decode_and_persist_raydium_cpmm_events( + &self, + transaction: &crate::KbChainTransactionDto, + instructions: &[crate::KbChainInstructionDto], + ) -> Result, crate::KbError> { + let mut persisted = std::vec::Vec::new(); + for instruction in instructions { + let program_id = match instruction.program_id.as_ref() { + Some(program_id) => program_id, + None => continue, + }; + if program_id.as_str() != crate::KB_RAYDIUM_CPMM_PROGRAM_ID { + continue; + } + let data_json = match instruction.data_json.as_ref() { + Some(data_json) => data_json, + None => continue, + }; + let decoded_events = crate::kb_decode_raydium_cpmm_instruction( + instruction.accounts_json.as_str(), + data_json.as_str(), + ); + for decoded_event in &decoded_events { + let persist_result = self + .persist_raydium_cpmm_event(transaction, instruction, decoded_event) + .await; + let persisted_event = match persist_result { + Ok(persisted_event) => persisted_event, + Err(error) => return Err(error), + }; + persisted.push(persisted_event); + } + } + Ok(persisted) + } + + async fn persist_raydium_cpmm_event( + &self, + transaction: &crate::KbChainTransactionDto, + instruction: &crate::KbChainInstructionDto, + decoded_event: &crate::KbRaydiumCpmmDecodedEvent, + ) -> Result { + let transaction_id = match transaction.id { + Some(transaction_id) => transaction_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "transaction '{}' has no internal id", + transaction.signature + ))); + } + }; + let instruction_id = match instruction.id { + Some(instruction_id) => instruction_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "raydium cpmm instruction for transaction '{}' has no internal id", + transaction.signature + ))); + } + }; + let payload_json = match decoded_event.to_payload_json() { + Some(payload_json) => payload_json, + None => { + return Err(crate::KbError::Json( + "cannot serialize decoded raydium cpmm payload".to_string(), + )); + } + }; + let event_kind = decoded_event.event_kind().to_string(); + let existing_result = crate::get_dex_decoded_event_by_key( + self.database.as_ref(), + transaction_id, + Some(instruction_id), + event_kind.as_str(), + ) + .await; + let existing_option = match existing_result { + Ok(existing_option) => existing_option, + Err(error) => return Err(error), + }; + let already_present = existing_option.is_some(); + let dto = crate::KbDexDecodedEventDto::new( + transaction_id, + Some(instruction_id), + "raydium_cpmm".to_string(), + crate::KB_RAYDIUM_CPMM_PROGRAM_ID.to_string(), + event_kind.clone(), + Some(decoded_event.pool_account().to_string()), + None, + Some(decoded_event.base_mint().to_string()), + Some(decoded_event.quote_mint().to_string()), + None, + payload_json.clone(), + ); + let upsert_result = crate::upsert_dex_decoded_event(self.database.as_ref(), &dto).await; + if let Err(error) = upsert_result { + return Err(error); + } + let fetched_result = crate::get_dex_decoded_event_by_key( + self.database.as_ref(), + transaction_id, + Some(instruction_id), + event_kind.as_str(), + ) + .await; + let fetched_option = match fetched_result { + Ok(fetched_option) => fetched_option, + Err(error) => return Err(error), + }; + let fetched = match fetched_option { + Some(fetched) => fetched, + None => { + return Err(crate::KbError::InvalidState( + "decoded raydium cpmm event disappeared after upsert".to_string(), + )); + } + }; + if !already_present { + let payload_value_result = + serde_json::from_str::(payload_json.as_str()); + let payload_value = match payload_value_result { + Ok(payload_value) => payload_value, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot parse raydium cpmm payload after serialization: {}", + error + ))); + } + }; + let observation_result = self + .persistence + .record_observation(&crate::KbDetectionObservationInput::new( + format!("dex.{}", event_kind), + crate::KbObservationSourceKind::HttpRpc, + transaction.source_endpoint_name.clone(), + transaction.signature.clone(), + transaction.slot, + payload_value.clone(), + )) + .await; + let observation_id = match observation_result { + Ok(observation_id) => observation_id, + Err(error) => return Err(error), + }; + let signal_result = self + .persistence + .record_signal(&crate::KbDetectionSignalInput::new( + format!("signal.dex.{}", event_kind), + crate::KbAnalysisSignalSeverity::Low, + transaction.signature.clone(), + Some(observation_id), + None, + payload_value, + )) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + Ok(fetched) + } + async fn persist_pump_fun_event( &self, transaction: &crate::KbChainTransactionDto, diff --git a/kb_lib/src/dex_detect.rs b/kb_lib/src/dex_detect.rs index e996f74..f8d5a4b 100644 --- a/kb_lib/src/dex_detect.rs +++ b/kb_lib/src/dex_detect.rs @@ -93,6 +93,19 @@ impl KbDexDetectService { }; detection_results.push(detect_result); } + if decoded_event.protocol_name == "raydium_cpmm" + && (decoded_event.event_kind == "raydium_cpmm.swap_base_input" + || decoded_event.event_kind == "raydium_cpmm.swap_base_output") + { + let detect_result = self + .detect_raydium_cpmm_trade(&transaction, decoded_event) + .await; + let detect_result = match detect_result { + Ok(detect_result) => detect_result, + Err(error) => return Err(error), + }; + detection_results.push(detect_result); + } if decoded_event.protocol_name == "pump_fun" && decoded_event.event_kind == "pump_fun.create_v2_token" { @@ -2735,6 +2748,238 @@ impl KbDexDetectService { }) } + async fn detect_raydium_cpmm_trade( + &self, + transaction: &crate::KbChainTransactionDto, + decoded_event: &crate::KbDexDecodedEventDto, + ) -> Result { + let decoded_event_id = match decoded_event.id { + Some(decoded_event_id) => decoded_event_id, + None => { + return Err(crate::KbError::InvalidState( + "decoded dex event has no internal id".to_string(), + )); + } + }; + let dex_id_result = self.ensure_raydium_cpmm_dex().await; + let dex_id = match dex_id_result { + Ok(dex_id) => dex_id, + Err(error) => return Err(error), + }; + let pool_address = match decoded_event.pool_account.clone() { + Some(pool_address) => pool_address, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no pool_account", + decoded_event_id + ))); + } + }; + let base_mint = match decoded_event.token_a_mint.clone() { + Some(base_mint) => base_mint, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no token_a_mint", + decoded_event_id + ))); + } + }; + let quote_mint = match decoded_event.token_b_mint.clone() { + Some(quote_mint) => quote_mint, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no token_b_mint", + decoded_event_id + ))); + } + }; + let payload_value_result = kb_parse_payload_json(decoded_event.payload_json.as_str()); + let payload_value = match payload_value_result { + Ok(payload_value) => payload_value, + Err(error) => return Err(error), + }; + let base_vault_address = kb_extract_payload_string_field(&payload_value, "base_vault"); + let quote_vault_address = kb_extract_payload_string_field(&payload_value, "quote_vault"); + let base_token_id_result = self.ensure_token(base_mint.as_str()).await; + let base_token_id = match base_token_id_result { + Ok(base_token_id) => base_token_id, + Err(error) => return Err(error), + }; + let quote_token_id_result = self.ensure_token(quote_mint.as_str()).await; + let quote_token_id = match quote_token_id_result { + Ok(quote_token_id) => quote_token_id, + Err(error) => return Err(error), + }; + let existing_pool_result = + crate::get_pool_by_address(self.database.as_ref(), pool_address.as_str()).await; + let existing_pool_option = match existing_pool_result { + Ok(existing_pool_option) => existing_pool_option, + Err(error) => return Err(error), + }; + let created_pool = existing_pool_option.is_none(); + let pool_id = match existing_pool_option { + Some(pool) => match pool.id { + Some(pool_id) => pool_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "pool '{}' has no internal id", + pool.address + ))); + } + }, + None => { + let pool_dto = crate::KbPoolDto::new( + dex_id, + pool_address.clone(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Active, + ); + let upsert_result = crate::upsert_pool(self.database.as_ref(), &pool_dto).await; + match upsert_result { + Ok(pool_id) => pool_id, + Err(error) => return Err(error), + } + } + }; + let existing_pair_result = + crate::get_pair_by_pool_id(self.database.as_ref(), pool_id).await; + let existing_pair_option = match existing_pair_result { + Ok(existing_pair_option) => existing_pair_option, + Err(error) => return Err(error), + }; + let created_pair = existing_pair_option.is_none(); + let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str()); + let pair_dto = + crate::KbPairDto::new(dex_id, pool_id, base_token_id, quote_token_id, pair_symbol); + let pair_id_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await; + let pair_id = match pair_id_result { + Ok(pair_id) => pair_id, + Err(error) => return Err(error), + }; + let upsert_base_pool_token_result = crate::upsert_pool_token( + self.database.as_ref(), + &crate::KbPoolTokenDto::new( + pool_id, + base_token_id, + crate::KbPoolTokenRole::Base, + base_vault_address, + Some(0), + ), + ) + .await; + if let Err(error) = upsert_base_pool_token_result { + return Err(error); + } + let upsert_quote_pool_token_result = crate::upsert_pool_token( + self.database.as_ref(), + &crate::KbPoolTokenDto::new( + pool_id, + quote_token_id, + crate::KbPoolTokenRole::Quote, + quote_vault_address, + Some(1), + ), + ) + .await; + if let Err(error) = upsert_quote_pool_token_result { + return Err(error); + } + let existing_listing_result = + crate::get_pool_listing_by_pool_id(self.database.as_ref(), pool_id).await; + let existing_listing_option = match existing_listing_result { + Ok(existing_listing_option) => existing_listing_option, + Err(error) => return Err(error), + }; + let created_listing = existing_listing_option.is_none(); + let pool_listing_id = match existing_listing_option { + Some(pool_listing) => pool_listing.id, + None => { + let listing_id_result = self + .upsert_pool_listing_from_decoded_event(dex_id, pool_id, pair_id, transaction) + .await; + match listing_id_result { + Ok(listing_id) => Some(listing_id), + Err(error) => return Err(error), + } + } + }; + if created_pool { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.raydium_cpmm.new_pool", + crate::KbAnalysisSignalSeverity::Low, + payload_value.clone(), + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + if created_pair { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.raydium_cpmm.new_pair", + crate::KbAnalysisSignalSeverity::Low, + payload_value.clone(), + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + if created_listing { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.raydium_cpmm.first_listing_seen", + crate::KbAnalysisSignalSeverity::Low, + payload_value, + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + Ok(crate::KbDexPoolDetectionResult { + decoded_event_id, + dex_id, + pool_id, + pair_id, + pool_listing_id, + created_pool, + created_pair, + created_listing, + }) + } + + async fn ensure_raydium_cpmm_dex(&self) -> Result { + let dex_result = crate::get_dex_by_code(self.database.as_ref(), "raydium_cpmm").await; + let dex_option = match dex_result { + Ok(dex_option) => dex_option, + Err(error) => return Err(error), + }; + match dex_option { + Some(dex) => match dex.id { + Some(dex_id) => Ok(dex_id), + None => Err(crate::KbError::InvalidState( + "raydium_cpmm dex has no internal id".to_string(), + )), + }, + None => { + let dex_dto = crate::KbDexDto::new( + "raydium_cpmm".to_string(), + "Raydium CPMM".to_string(), + Some(crate::KB_RAYDIUM_CPMM_PROGRAM_ID.to_string()), + None, + true, + ); + crate::upsert_dex(self.database.as_ref(), &dex_dto).await + } + } + } + async fn ensure_dexlab_dex(&self) -> Result { let dex_result = crate::get_dex_by_code(self.database.as_ref(), "dexlab").await; let dex_option = match dex_result { @@ -3160,6 +3405,21 @@ fn kb_extract_pump_swap_vault_addresses( (token_a_vault_address, token_b_vault_address) } +fn kb_extract_payload_string_field( + payload_value: &serde_json::Value, + field_name: &str, +) -> std::option::Option { + let value_option = payload_value.get(field_name); + let value = match value_option { + Some(value) => value, + None => return None, + }; + match value.as_str() { + Some(value) => Some(value.to_string()), + None => None, + } +} + #[cfg(test)] mod tests { async fn make_database() -> std::sync::Arc { diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index 488973f..085e46d 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -49,6 +49,82 @@ pub use config::KbSolanaConfig; pub use config::KbSqliteDatabaseConfig; pub use config::KbWsEndpointConfig; pub use constants::*; +pub use db::KbAnalysisSignalDto; +pub use db::KbAnalysisSignalEntity; +pub use db::KbAnalysisSignalSeverity; +pub use db::KbChainInstructionDto; +pub use db::KbChainInstructionEntity; +pub use db::KbChainSlotDto; +pub use db::KbChainSlotEntity; +pub use db::KbChainTransactionDto; +pub use db::KbChainTransactionEntity; +pub use db::KbDatabase; +pub use db::KbDatabaseBackend; +pub use db::KbDatabaseConnection; +pub use db::KbDbMetadataDto; +pub use db::KbDbMetadataEntity; +pub use db::KbDbRuntimeEventDto; +pub use db::KbDbRuntimeEventEntity; +pub use db::KbDbRuntimeEventLevel; +pub use db::KbDexDecodedEventDto; +pub use db::KbDexDecodedEventEntity; +pub use db::KbDexDto; +pub use db::KbDexEntity; +pub use db::KbKnownHttpEndpointDto; +pub use db::KbKnownHttpEndpointEntity; +pub use db::KbKnownWsEndpointDto; +pub use db::KbKnownWsEndpointEntity; +pub use db::KbLaunchAttributionDto; +pub use db::KbLaunchAttributionEntity; +pub use db::KbLaunchSurfaceDto; +pub use db::KbLaunchSurfaceEntity; +pub use db::KbLaunchSurfaceKeyDto; +pub use db::KbLaunchSurfaceKeyEntity; +pub use db::KbLiquidityEventDto; +pub use db::KbLiquidityEventEntity; +pub use db::KbLiquidityEventKind; +pub use db::KbObservationSourceKind; +pub use db::KbObservedTokenDto; +pub use db::KbObservedTokenEntity; +pub use db::KbObservedTokenStatus; +pub use db::KbOnchainObservationDto; +pub use db::KbOnchainObservationEntity; +pub use db::KbPairAnalyticSignalDto; +pub use db::KbPairAnalyticSignalEntity; +pub use db::KbPairCandleDto; +pub use db::KbPairCandleEntity; +pub use db::KbPairDto; +pub use db::KbPairEntity; +pub use db::KbPairMetricDto; +pub use db::KbPairMetricEntity; +pub use db::KbPoolDto; +pub use db::KbPoolEntity; +pub use db::KbPoolKind; +pub use db::KbPoolListingDto; +pub use db::KbPoolListingEntity; +pub use db::KbPoolOriginDto; +pub use db::KbPoolOriginEntity; +pub use db::KbPoolStatus; +pub use db::KbPoolTokenDto; +pub use db::KbPoolTokenEntity; +pub use db::KbPoolTokenRole; +pub use db::KbSwapDto; +pub use db::KbSwapEntity; +pub use db::KbSwapTradeSide; +pub use db::KbTokenBurnEventDto; +pub use db::KbTokenBurnEventEntity; +pub use db::KbTokenDto; +pub use db::KbTokenEntity; +pub use db::KbTokenMintEventDto; +pub use db::KbTokenMintEventEntity; +pub use db::KbTradeEventDto; +pub use db::KbTradeEventEntity; +pub use db::KbWalletDto; +pub use db::KbWalletEntity; +pub use db::KbWalletHoldingDto; +pub use db::KbWalletHoldingEntity; +pub use db::KbWalletParticipationDto; +pub use db::KbWalletParticipationEntity; pub use db::delete_chain_instructions_by_transaction_id; pub use db::get_chain_slot; pub use db::get_chain_transaction_by_signature; @@ -139,82 +215,6 @@ pub use db::upsert_trade_event; pub use db::upsert_wallet; pub use db::upsert_wallet_holding; pub use db::upsert_wallet_participation; -pub use db::KbAnalysisSignalDto; -pub use db::KbAnalysisSignalEntity; -pub use db::KbAnalysisSignalSeverity; -pub use db::KbChainInstructionDto; -pub use db::KbChainInstructionEntity; -pub use db::KbChainSlotDto; -pub use db::KbChainSlotEntity; -pub use db::KbChainTransactionDto; -pub use db::KbChainTransactionEntity; -pub use db::KbDatabase; -pub use db::KbDatabaseBackend; -pub use db::KbDatabaseConnection; -pub use db::KbDbMetadataDto; -pub use db::KbDbMetadataEntity; -pub use db::KbDbRuntimeEventDto; -pub use db::KbDbRuntimeEventEntity; -pub use db::KbDbRuntimeEventLevel; -pub use db::KbDexDecodedEventDto; -pub use db::KbDexDecodedEventEntity; -pub use db::KbDexDto; -pub use db::KbDexEntity; -pub use db::KbKnownHttpEndpointDto; -pub use db::KbKnownHttpEndpointEntity; -pub use db::KbKnownWsEndpointDto; -pub use db::KbKnownWsEndpointEntity; -pub use db::KbLaunchAttributionDto; -pub use db::KbLaunchAttributionEntity; -pub use db::KbLaunchSurfaceDto; -pub use db::KbLaunchSurfaceEntity; -pub use db::KbLaunchSurfaceKeyDto; -pub use db::KbLaunchSurfaceKeyEntity; -pub use db::KbLiquidityEventDto; -pub use db::KbLiquidityEventEntity; -pub use db::KbLiquidityEventKind; -pub use db::KbObservationSourceKind; -pub use db::KbObservedTokenDto; -pub use db::KbObservedTokenEntity; -pub use db::KbObservedTokenStatus; -pub use db::KbOnchainObservationDto; -pub use db::KbOnchainObservationEntity; -pub use db::KbPairAnalyticSignalDto; -pub use db::KbPairAnalyticSignalEntity; -pub use db::KbPairCandleDto; -pub use db::KbPairCandleEntity; -pub use db::KbPairDto; -pub use db::KbPairEntity; -pub use db::KbPairMetricDto; -pub use db::KbPairMetricEntity; -pub use db::KbPoolDto; -pub use db::KbPoolEntity; -pub use db::KbPoolKind; -pub use db::KbPoolListingDto; -pub use db::KbPoolListingEntity; -pub use db::KbPoolOriginDto; -pub use db::KbPoolOriginEntity; -pub use db::KbPoolStatus; -pub use db::KbPoolTokenDto; -pub use db::KbPoolTokenEntity; -pub use db::KbPoolTokenRole; -pub use db::KbSwapDto; -pub use db::KbSwapEntity; -pub use db::KbSwapTradeSide; -pub use db::KbTokenBurnEventDto; -pub use db::KbTokenBurnEventEntity; -pub use db::KbTokenDto; -pub use db::KbTokenEntity; -pub use db::KbTokenMintEventDto; -pub use db::KbTokenMintEventEntity; -pub use db::KbTradeEventDto; -pub use db::KbTradeEventEntity; -pub use db::KbWalletDto; -pub use db::KbWalletEntity; -pub use db::KbWalletHoldingDto; -pub use db::KbWalletHoldingEntity; -pub use db::KbWalletParticipationDto; -pub use db::KbWalletParticipationEntity; pub use detect::KbDetectionObservationInput; pub use detect::KbDetectionPersistenceService; pub use detect::KbDetectionPoolCandidateInput; @@ -227,6 +227,16 @@ pub use detect::KbSolanaWsDetectionService; pub use detect::KbWsDetectionNotificationEnvelope; pub use detect::KbWsDetectionRelay; pub use detect::KbWsDetectionRelayStats; +pub use dex::KB_DEXLAB_PROGRAM_ID; +pub use dex::KB_FLUXBEAM_PROGRAM_ID; +pub use dex::KB_METEORA_DAMM_V1_PROGRAM_ID; +pub use dex::KB_METEORA_DAMM_V2_PROGRAM_ID; +pub use dex::KB_METEORA_DBC_PROGRAM_ID; +pub use dex::KB_ORCA_WHIRLPOOLS_PROGRAM_ID; +pub use dex::KB_PUMP_FUN_PROGRAM_ID; +pub use dex::KB_PUMP_SWAP_PROGRAM_ID; +pub use dex::KB_RAYDIUM_AMM_V4_PROGRAM_ID; +pub use dex::KB_RAYDIUM_CPMM_PROGRAM_ID; pub use dex::KbDexlabCreatePoolDecoded; pub use dex::KbDexlabDecodedEvent; pub use dex::KbDexlabDecoder; @@ -261,21 +271,14 @@ pub use dex::KbPumpSwapTradeDecoded; pub use dex::KbRaydiumAmmV4DecodedEvent; pub use dex::KbRaydiumAmmV4Decoder; pub use dex::KbRaydiumAmmV4Initialize2PoolDecoded; -pub use dex::KB_DEXLAB_PROGRAM_ID; -pub use dex::KB_FLUXBEAM_PROGRAM_ID; -pub use dex::KB_METEORA_DAMM_V1_PROGRAM_ID; -pub use dex::KB_METEORA_DAMM_V2_PROGRAM_ID; -pub use dex::KB_METEORA_DBC_PROGRAM_ID; -pub use dex::KB_ORCA_WHIRLPOOLS_PROGRAM_ID; -pub use dex::KB_PUMP_FUN_PROGRAM_ID; -pub use dex::KB_PUMP_SWAP_PROGRAM_ID; -pub use dex::KB_RAYDIUM_AMM_V4_PROGRAM_ID; +pub use dex::KbRaydiumCpmmDecodedEvent; +pub use dex::KbRaydiumCpmmSwapDecoded; +pub use dex::KbRaydiumCpmmSwapMode; +pub use dex::kb_decode_raydium_cpmm_instruction; pub use dex_decode::KbDexDecodeService; pub use dex_detect::KbDexDetectService; pub use dex_detect::KbDexPoolDetectionResult; pub use error::KbError; -pub use http_client::parse_kb_json_rpc_http_response_text; -pub use http_client::parse_kb_json_rpc_http_response_value; pub use http_client::HttpClient; pub use http_client::KbHttpEndpointStatus; pub use http_client::KbHttpMethodClass; @@ -284,11 +287,10 @@ pub use http_client::KbJsonRpcHttpErrorResponse; pub use http_client::KbJsonRpcHttpRequest; pub use http_client::KbJsonRpcHttpResponse; pub use http_client::KbJsonRpcHttpSuccessResponse; +pub use http_client::parse_kb_json_rpc_http_response_text; +pub use http_client::parse_kb_json_rpc_http_response_value; pub use http_pool::HttpEndpointPool; pub use http_pool::KbHttpPoolClientSnapshot; -pub use json_rpc_ws::kb_is_probable_json_rpc_object_text; -pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_text; -pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_value; pub use json_rpc_ws::KbJsonRpcWsErrorObject; pub use json_rpc_ws::KbJsonRpcWsErrorResponse; pub use json_rpc_ws::KbJsonRpcWsIncomingMessage; @@ -296,6 +298,9 @@ pub use json_rpc_ws::KbJsonRpcWsNotification; pub use json_rpc_ws::KbJsonRpcWsNotificationParams; pub use json_rpc_ws::KbJsonRpcWsRequest; pub use json_rpc_ws::KbJsonRpcWsSuccessResponse; +pub use json_rpc_ws::kb_is_probable_json_rpc_object_text; +pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_text; +pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_value; pub use launch_origin::KbLaunchAttributionResult; pub use launch_origin::KbLaunchOriginService; pub use pair_analytic_signal::KbPairAnalyticSignalResult; @@ -305,14 +310,14 @@ pub use pair_candle_aggregation::KbPairCandleAggregationService; pub use pair_candle_query::KbPairCandleQueryService; pub use pool_origin::KbPoolOriginResult; pub use pool_origin::KbPoolOriginService; +pub use solana_pubsub_ws::KbSolanaWsTypedNotification; pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification; pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event; -pub use solana_pubsub_ws::KbSolanaWsTypedNotification; pub use token_backfill::KbPoolBackfillResult; pub use token_backfill::KbTokenBackfillResult; pub use token_backfill::KbTokenBackfillService; -pub use tracing::init_tracing; pub use tracing::KbTracingGuard; +pub use tracing::init_tracing; pub use trade_aggregation::KbTradeAggregationResult; pub use trade_aggregation::KbTradeAggregationService; pub use tx_model::KbTransactionModelService; diff --git a/kb_lib/src/trade_aggregation.rs b/kb_lib/src/trade_aggregation.rs index 25a0e04..eb4bf62 100644 --- a/kb_lib/src/trade_aggregation.rs +++ b/kb_lib/src/trade_aggregation.rs @@ -183,7 +183,7 @@ impl KbTradeAggregationService { || quote_amount_raw.is_none() || price_quote_per_base.is_none()) { - let inferred_result = kb_extract_pump_swap_amounts_from_transaction( + let inferred_result = kb_extract_trade_amounts_from_vault_balance_deltas( transaction.transaction_json.as_str(), transaction.meta_json.as_deref(), base_vault_address.as_deref(), @@ -228,6 +228,31 @@ impl KbTradeAggregationService { price_quote_per_base = inferred.2; } } + if decoded_event.event_kind.starts_with("raydium_cpmm.") + && (base_amount_raw.is_none() + || quote_amount_raw.is_none() + || price_quote_per_base.is_none()) + { + let inferred_result = kb_extract_trade_amounts_from_vault_balance_deltas( + transaction.transaction_json.as_str(), + transaction.meta_json.as_deref(), + base_vault_address.as_deref(), + quote_vault_address.as_deref(), + ); + let inferred = match inferred_result { + Ok(inferred) => inferred, + Err(error) => return Err(error), + }; + if base_amount_raw.is_none() { + base_amount_raw = inferred.0; + } + if quote_amount_raw.is_none() { + quote_amount_raw = inferred.1; + } + if price_quote_per_base.is_none() { + price_quote_per_base = inferred.2; + } + } if price_quote_per_base.is_none() { price_quote_per_base = kb_compute_price_quote_per_base_with_decimals( transaction.meta_json.as_deref(), @@ -389,6 +414,12 @@ fn kb_is_trade_event_kind(event_kind: &str) -> bool { if event_kind.ends_with(".sell") { return true; } + if event_kind == "raydium_cpmm.swap_base_input" { + return true; + } + if event_kind == "raydium_cpmm.swap_base_output" { + return true; + } false } @@ -403,10 +434,15 @@ fn kb_convert_slot_to_i64(slot: std::option::Option) -> std::option::Option } fn kb_extract_trade_side(event_kind: &str, payload: &serde_json::Value) -> crate::KbSwapTradeSide { - let trade_side_option = kb_extract_string_by_candidate_keys(payload, &["tradeSide"]); + let trade_side_option = + kb_extract_string_by_candidate_keys(payload, &["tradeSide", "trade_side"]); match trade_side_option.as_deref() { Some("BuyBase") => return crate::KbSwapTradeSide::BuyBase, + Some("buy") => return crate::KbSwapTradeSide::BuyBase, + Some("BUY") => return crate::KbSwapTradeSide::BuyBase, Some("SellBase") => return crate::KbSwapTradeSide::SellBase, + Some("sell") => return crate::KbSwapTradeSide::SellBase, + Some("SELL") => return crate::KbSwapTradeSide::SellBase, _ => {} } if event_kind.ends_with(".buy") { @@ -580,7 +616,7 @@ fn kb_find_pool_token_vault_address_by_token_id( None } -fn kb_extract_pump_swap_amounts_from_transaction( +fn kb_extract_trade_amounts_from_vault_balance_deltas( transaction_json: &str, meta_json: std::option::Option<&str>, base_vault_address: std::option::Option<&str>, @@ -1000,7 +1036,7 @@ fn kb_compute_price_quote_per_base_with_decimals( base_vault_address: std::option::Option<&str>, quote_vault_address: std::option::Option<&str>, ) -> std::option::Option { - let inferred_result = kb_extract_pump_swap_amounts_from_transaction( + let inferred_result = kb_extract_trade_amounts_from_vault_balance_deltas( transaction_json, meta_json, base_vault_address,