0.7.24-pre.3

This commit is contained in:
2026-05-03 10:03:54 +02:00
parent d44171ca6f
commit 7ecf7e1af2
7 changed files with 1059 additions and 107 deletions

View File

@@ -11,47 +11,53 @@ mod orca_whirlpools;
mod pump_fun;
mod pump_swap;
mod raydium_amm_v4;
mod raydium_cpmm;
pub use dexlab::KB_DEXLAB_PROGRAM_ID;
pub use dexlab::KbDexlabCreatePoolDecoded;
pub use dexlab::KbDexlabDecodedEvent;
pub use dexlab::KbDexlabDecoder;
pub use dexlab::KbDexlabSwapDecoded;
pub use dexlab::KB_DEXLAB_PROGRAM_ID;
pub use fluxbeam::KB_FLUXBEAM_PROGRAM_ID;
pub use fluxbeam::KbFluxbeamCreatePoolDecoded;
pub use fluxbeam::KbFluxbeamDecodedEvent;
pub use fluxbeam::KbFluxbeamDecoder;
pub use fluxbeam::KbFluxbeamSwapDecoded;
pub use fluxbeam::KB_FLUXBEAM_PROGRAM_ID;
pub use meteora_damm_v1::KB_METEORA_DAMM_V1_PROGRAM_ID;
pub use meteora_damm_v1::KbMeteoraDammV1CreatePoolDecoded;
pub use meteora_damm_v1::KbMeteoraDammV1DecodedEvent;
pub use meteora_damm_v1::KbMeteoraDammV1Decoder;
pub use meteora_damm_v1::KbMeteoraDammV1SwapDecoded;
pub use meteora_damm_v1::KB_METEORA_DAMM_V1_PROGRAM_ID;
pub use meteora_damm_v2::KB_METEORA_DAMM_V2_PROGRAM_ID;
pub use meteora_damm_v2::KbMeteoraDammV2CreatePoolDecoded;
pub use meteora_damm_v2::KbMeteoraDammV2DecodedEvent;
pub use meteora_damm_v2::KbMeteoraDammV2Decoder;
pub use meteora_damm_v2::KbMeteoraDammV2SwapDecoded;
pub use meteora_damm_v2::KB_METEORA_DAMM_V2_PROGRAM_ID;
pub use meteora_dbc::KB_METEORA_DBC_PROGRAM_ID;
pub use meteora_dbc::KbMeteoraDbcCreatePoolDecoded;
pub use meteora_dbc::KbMeteoraDbcDecodedEvent;
pub use meteora_dbc::KbMeteoraDbcDecoder;
pub use meteora_dbc::KbMeteoraDbcSwapDecoded;
pub use meteora_dbc::KB_METEORA_DBC_PROGRAM_ID;
pub use orca_whirlpools::KB_ORCA_WHIRLPOOLS_PROGRAM_ID;
pub use orca_whirlpools::KbOrcaWhirlpoolsCreatePoolDecoded;
pub use orca_whirlpools::KbOrcaWhirlpoolsDecodedEvent;
pub use orca_whirlpools::KbOrcaWhirlpoolsDecoder;
pub use orca_whirlpools::KbOrcaWhirlpoolsSwapDecoded;
pub use orca_whirlpools::KB_ORCA_WHIRLPOOLS_PROGRAM_ID;
pub use pump_fun::KB_PUMP_FUN_PROGRAM_ID;
pub use pump_fun::KbPumpFunCreateV2TokenDecoded;
pub use pump_fun::KbPumpFunDecodedEvent;
pub use pump_fun::KbPumpFunDecoder;
pub use pump_fun::KbPumpFunTradeDecoded;
pub use pump_fun::KB_PUMP_FUN_PROGRAM_ID;
pub use pump_swap::KB_PUMP_SWAP_PROGRAM_ID;
pub use pump_swap::KbPumpSwapDecodedEvent;
pub use pump_swap::KbPumpSwapDecoder;
pub use pump_swap::KbPumpSwapTradeDecoded;
pub use pump_swap::KB_PUMP_SWAP_PROGRAM_ID;
pub use raydium_amm_v4::KB_RAYDIUM_AMM_V4_PROGRAM_ID;
pub use raydium_amm_v4::KbRaydiumAmmV4DecodedEvent;
pub use raydium_amm_v4::KbRaydiumAmmV4Decoder;
pub use raydium_amm_v4::KbRaydiumAmmV4Initialize2PoolDecoded;
pub use raydium_amm_v4::KB_RAYDIUM_AMM_V4_PROGRAM_ID;
pub use raydium_cpmm::KB_RAYDIUM_CPMM_PROGRAM_ID;
pub use raydium_cpmm::KbRaydiumCpmmDecodedEvent;
pub use raydium_cpmm::KbRaydiumCpmmSwapDecoded;
pub use raydium_cpmm::KbRaydiumCpmmSwapMode;
pub use raydium_cpmm::kb_decode_raydium_cpmm_instruction;

View File

@@ -0,0 +1,473 @@
// file: kb_lib/src/dex/raydium_cpmm.rs
//! Raydium CPMM decoder.
//!
//! This module decodes Raydium constant product swap instructions from
//! already-projected Solana transaction instructions.
/// Raydium CPMM mainnet program id.
pub const KB_RAYDIUM_CPMM_PROGRAM_ID: &str = "CPMMoo8L3F4NbTegBCKVNunggL7H1ZpdTHKxQB5qKP1C";
/// Raydium CPMM `swap_base_input` discriminator.
const KB_RAYDIUM_CPMM_SWAP_BASE_INPUT_DISCRIMINATOR: [u8; 8] =
[143, 190, 90, 218, 196, 30, 51, 222];
/// Raydium CPMM `swap_base_output` discriminator.
const KB_RAYDIUM_CPMM_SWAP_BASE_OUTPUT_DISCRIMINATOR: [u8; 8] =
[55, 217, 98, 86, 163, 74, 180, 173];
/// Raydium CPMM decoded event.
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
pub enum KbRaydiumCpmmDecodedEvent {
/// Swap where the user fixes the input amount.
SwapBaseInput(KbRaydiumCpmmSwapDecoded),
/// Swap where the user fixes the output amount.
SwapBaseOutput(KbRaydiumCpmmSwapDecoded),
}
/// Raydium CPMM swap mode.
#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
pub enum KbRaydiumCpmmSwapMode {
/// Fixed input swap.
BaseInput,
/// Fixed output swap.
BaseOutput,
}
/// Raydium CPMM decoded swap.
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
pub struct KbRaydiumCpmmSwapDecoded {
/// Instruction mode.
pub swap_mode: KbRaydiumCpmmSwapMode,
/// User or payer account.
pub payer: std::string::String,
/// Raydium authority account.
pub authority: std::string::String,
/// AMM config account.
pub amm_config: std::string::String,
/// CPMM pool state account.
pub pool_state: std::string::String,
/// User input token account.
pub input_token_account: std::string::String,
/// User output token account.
pub output_token_account: std::string::String,
/// Pool input vault.
pub input_vault: std::string::String,
/// Pool output vault.
pub output_vault: std::string::String,
/// Input token program.
pub input_token_program: std::string::String,
/// Output token program.
pub output_token_program: std::string::String,
/// Input token mint.
pub input_token_mint: std::string::String,
/// Output token mint.
pub output_token_mint: std::string::String,
/// Observation state.
pub observation_state: std::string::String,
/// Normalized base mint.
pub base_mint: std::string::String,
/// Normalized quote mint.
pub quote_mint: std::string::String,
/// Normalized base vault.
pub base_vault: std::string::String,
/// Normalized quote vault.
pub quote_vault: std::string::String,
/// True when input mint is the normalized base mint.
pub input_is_base: bool,
/// Trade side from the normalized pair perspective.
pub trade_side: std::string::String,
/// Fixed input amount, for swap_base_input.
pub amount_in_raw: std::option::Option<std::string::String>,
/// Minimum output amount, for swap_base_input.
pub minimum_amount_out_raw: std::option::Option<std::string::String>,
/// Maximum input amount, for swap_base_output.
pub max_amount_in_raw: std::option::Option<std::string::String>,
/// Fixed output amount, for swap_base_output.
pub amount_out_raw: std::option::Option<std::string::String>,
}
impl KbRaydiumCpmmDecodedEvent {
/// Returns the storage event kind.
pub fn event_kind(&self) -> &'static str {
match self {
KbRaydiumCpmmDecodedEvent::SwapBaseInput(_) => "raydium_cpmm.swap_base_input",
KbRaydiumCpmmDecodedEvent::SwapBaseOutput(_) => "raydium_cpmm.swap_base_output",
}
}
/// Returns the pool account.
pub fn pool_account(&self) -> &str {
match self {
KbRaydiumCpmmDecodedEvent::SwapBaseInput(event) => event.pool_state.as_str(),
KbRaydiumCpmmDecodedEvent::SwapBaseOutput(event) => event.pool_state.as_str(),
}
}
/// Returns the normalized base mint.
pub fn base_mint(&self) -> &str {
match self {
KbRaydiumCpmmDecodedEvent::SwapBaseInput(event) => event.base_mint.as_str(),
KbRaydiumCpmmDecodedEvent::SwapBaseOutput(event) => event.base_mint.as_str(),
}
}
/// Returns the normalized quote mint.
pub fn quote_mint(&self) -> &str {
match self {
KbRaydiumCpmmDecodedEvent::SwapBaseInput(event) => event.quote_mint.as_str(),
KbRaydiumCpmmDecodedEvent::SwapBaseOutput(event) => event.quote_mint.as_str(),
}
}
/// Converts the decoded event to JSON payload.
pub fn to_payload_json(&self) -> std::option::Option<std::string::String> {
match self {
crate::KbRaydiumCpmmDecodedEvent::SwapBaseInput(event) => {
let result = serde_json::to_string(event);
match result {
Ok(payload) => Some(payload),
Err(_) => None,
}
}
crate::KbRaydiumCpmmDecodedEvent::SwapBaseOutput(event) => {
let result = serde_json::to_string(event);
match result {
Ok(payload) => Some(payload),
Err(_) => None,
}
}
}
}
}
/// Decodes one Raydium CPMM instruction from projected instruction fields.
pub fn kb_decode_raydium_cpmm_instruction(
accounts_json: &str,
data_json: &str,
) -> std::vec::Vec<KbRaydiumCpmmDecodedEvent> {
let accounts = match kb_parse_accounts_json(accounts_json) {
Some(accounts) => accounts,
None => return std::vec::Vec::new(),
};
let data_base58 = match kb_parse_data_json_as_base58(data_json) {
Some(data_base58) => data_base58,
None => return std::vec::Vec::new(),
};
let data = match bs58::decode(data_base58.as_str()).into_vec() {
Ok(data) => data,
Err(_) => return std::vec::Vec::new(),
};
if data.len() < 24 {
return std::vec::Vec::new();
}
let discriminator = [
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
];
if discriminator == KB_RAYDIUM_CPMM_SWAP_BASE_INPUT_DISCRIMINATOR {
let amount_in = match kb_read_u64_le(data.as_slice(), 8) {
Some(value) => value,
None => return std::vec::Vec::new(),
};
let minimum_amount_out = match kb_read_u64_le(data.as_slice(), 16) {
Some(value) => value,
None => return std::vec::Vec::new(),
};
let swap = match kb_build_raydium_cpmm_swap(
KbRaydiumCpmmSwapMode::BaseInput,
accounts.as_slice(),
Some(amount_in.to_string()),
Some(minimum_amount_out.to_string()),
None,
None,
) {
Some(swap) => swap,
None => return std::vec::Vec::new(),
};
return vec![KbRaydiumCpmmDecodedEvent::SwapBaseInput(swap)];
}
if discriminator == KB_RAYDIUM_CPMM_SWAP_BASE_OUTPUT_DISCRIMINATOR {
let max_amount_in = match kb_read_u64_le(data.as_slice(), 8) {
Some(value) => value,
None => return std::vec::Vec::new(),
};
let amount_out = match kb_read_u64_le(data.as_slice(), 16) {
Some(value) => value,
None => return std::vec::Vec::new(),
};
let swap = match kb_build_raydium_cpmm_swap(
KbRaydiumCpmmSwapMode::BaseOutput,
accounts.as_slice(),
None,
None,
Some(max_amount_in.to_string()),
Some(amount_out.to_string()),
) {
Some(swap) => swap,
None => return std::vec::Vec::new(),
};
return vec![KbRaydiumCpmmDecodedEvent::SwapBaseOutput(swap)];
}
std::vec::Vec::new()
}
fn kb_build_raydium_cpmm_swap(
swap_mode: KbRaydiumCpmmSwapMode,
accounts: &[std::string::String],
amount_in_raw: std::option::Option<std::string::String>,
minimum_amount_out_raw: std::option::Option<std::string::String>,
max_amount_in_raw: std::option::Option<std::string::String>,
amount_out_raw: std::option::Option<std::string::String>,
) -> std::option::Option<KbRaydiumCpmmSwapDecoded> {
if accounts.len() < 13 {
return None;
}
let input_token_mint = accounts[10].clone();
let output_token_mint = accounts[11].clone();
let input_vault = accounts[6].clone();
let output_vault = accounts[7].clone();
let normalized = kb_normalize_raydium_cpmm_pair(
input_token_mint.as_str(),
output_token_mint.as_str(),
input_vault.as_str(),
output_vault.as_str(),
);
let input_is_base = normalized.input_is_base;
let trade_side = if input_is_base {
"sell".to_string()
} else {
"buy".to_string()
};
Some(KbRaydiumCpmmSwapDecoded {
swap_mode,
payer: accounts[0].clone(),
authority: accounts[1].clone(),
amm_config: accounts[2].clone(),
pool_state: accounts[3].clone(),
input_token_account: accounts[4].clone(),
output_token_account: accounts[5].clone(),
input_vault,
output_vault,
input_token_program: accounts[8].clone(),
output_token_program: accounts[9].clone(),
input_token_mint,
output_token_mint,
observation_state: accounts[12].clone(),
base_mint: normalized.base_mint,
quote_mint: normalized.quote_mint,
base_vault: normalized.base_vault,
quote_vault: normalized.quote_vault,
input_is_base,
trade_side,
amount_in_raw,
minimum_amount_out_raw,
max_amount_in_raw,
amount_out_raw,
})
}
struct KbRaydiumCpmmNormalizedPair {
base_mint: std::string::String,
quote_mint: std::string::String,
base_vault: std::string::String,
quote_vault: std::string::String,
input_is_base: bool,
}
fn kb_normalize_raydium_cpmm_pair(
input_mint: &str,
output_mint: &str,
input_vault: &str,
output_vault: &str,
) -> KbRaydiumCpmmNormalizedPair {
if kb_is_quote_mint(output_mint) && !kb_is_quote_mint(input_mint) {
return KbRaydiumCpmmNormalizedPair {
base_mint: input_mint.to_string(),
quote_mint: output_mint.to_string(),
base_vault: input_vault.to_string(),
quote_vault: output_vault.to_string(),
input_is_base: true,
};
}
if kb_is_quote_mint(input_mint) && !kb_is_quote_mint(output_mint) {
return KbRaydiumCpmmNormalizedPair {
base_mint: output_mint.to_string(),
quote_mint: input_mint.to_string(),
base_vault: output_vault.to_string(),
quote_vault: input_vault.to_string(),
input_is_base: false,
};
}
if input_mint <= output_mint {
return KbRaydiumCpmmNormalizedPair {
base_mint: input_mint.to_string(),
quote_mint: output_mint.to_string(),
base_vault: input_vault.to_string(),
quote_vault: output_vault.to_string(),
input_is_base: true,
};
}
KbRaydiumCpmmNormalizedPair {
base_mint: output_mint.to_string(),
quote_mint: input_mint.to_string(),
base_vault: output_vault.to_string(),
quote_vault: input_vault.to_string(),
input_is_base: false,
}
}
fn kb_is_quote_mint(mint: &str) -> bool {
if mint == "So11111111111111111111111111111111111111112" {
return true;
}
if mint == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v" {
return true;
}
if mint == "Es9vMFrzaCERmJfrF4H2FYD4KMZbsJyNeYrBbqD6CbK" {
return true;
}
if mint == "USD1ttGY1N17NEEHLmELoaybftRBUSErhqYiQzvEmuB" {
return true;
}
false
}
fn kb_parse_accounts_json(
accounts_json: &str,
) -> std::option::Option<std::vec::Vec<std::string::String>> {
let result = serde_json::from_str::<std::vec::Vec<std::string::String>>(accounts_json);
match result {
Ok(accounts) => Some(accounts),
Err(_) => None,
}
}
fn kb_parse_data_json_as_base58(data_json: &str) -> std::option::Option<std::string::String> {
let json_string_result = serde_json::from_str::<std::string::String>(data_json);
match json_string_result {
Ok(value) => return Some(value),
Err(_) => {}
}
let trimmed = data_json.trim();
if trimmed.is_empty() {
return None;
}
let without_quotes = trimmed.trim_matches('"');
if without_quotes.is_empty() {
return None;
}
Some(without_quotes.to_string())
}
fn kb_read_u64_le(data: &[u8], offset: usize) -> std::option::Option<u64> {
if data.len() < offset + 8 {
return None;
}
let bytes = [
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
data[offset + 4],
data[offset + 5],
data[offset + 6],
data[offset + 7],
];
Some(u64::from_le_bytes(bytes))
}
#[cfg(test)]
mod tests {
#[test]
fn decodes_swap_base_input() {
let accounts_json = r#"[
"ARu4n5mFdZogZAravu7CcizaojWnS6oqka37gdLT5SZn",
"GpMZbSM2GgvTKHJirzeGfMFoaZ8UR2X7F4v8vHTvxFbL",
"GtdipqAcw8eAYxGcANs3vpN7UaN7UH7u89Kd8wPqLThd",
"2ErXvV1tKtG3wiHqdofDjMou7Jusdsfasvfh8HrTj5oV",
"3tBz6MmUTaf2QdnAnYYJGDpgbTXdANP9v4CtUL42GZaZ",
"C1qMskwQhQ4cn1M6RG8uWfUuMrq6ysRPEHccGxk4tjw8",
"8aHJfDQBmngBpFwLysD1bhj8LKh7ywbxfUxsy7VQkx22",
"A4jPLDGrkAogPgm8KqQzZQ4FoNJuKuKMCbNqgQkpT5Ci",
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
"Pf9aSicGu3g6tTUBqrRbjNsGape9HopibspX5KSbonk",
"USD1ttGY1N17NEEHLmELoaybftRBUSErhqYiQzvEmuB",
"9fzQ17bEnqJSsyHL5CodJqY2sdjZJBQCQLbgFZ1BYTWn"
]"#;
let events = crate::kb_decode_raydium_cpmm_instruction(
accounts_json,
r#""E73fXHPWvSRF4Q2ZQFvPoeJBVGDUEMmxB""#,
);
assert_eq!(events.len(), 1);
match &events[0] {
crate::KbRaydiumCpmmDecodedEvent::SwapBaseInput(event) => {
assert_eq!(
event.pool_state,
"2ErXvV1tKtG3wiHqdofDjMou7Jusdsfasvfh8HrTj5oV"
);
assert_eq!(
event.base_mint,
"Pf9aSicGu3g6tTUBqrRbjNsGape9HopibspX5KSbonk"
);
assert_eq!(
event.quote_mint,
"USD1ttGY1N17NEEHLmELoaybftRBUSErhqYiQzvEmuB"
);
assert_eq!(event.input_is_base, true);
assert_eq!(event.trade_side, "sell");
assert_eq!(event.amount_in_raw.is_some(), true);
assert_eq!(event.minimum_amount_out_raw.is_some(), true);
}
_ => {
panic!("expected swap base input");
}
}
}
#[test]
fn decodes_swap_base_output() {
let accounts_json = r#"[
"JD6rVaerbyz6wjQ433nrw6bFTgFrp46MiYmi8EtUAfsG",
"GpMZbSM2GgvTKHJirzeGfMFoaZ8UR2X7F4v8vHTvxFbL",
"GtdipqAcw8eAYxGcANs3vpN7UaN7UH7u89Kd8wPqLThd",
"2ErXvV1tKtG3wiHqdofDjMou7Jusdsfasvfh8HrTj5oV",
"4Fg913oAKp7BiW6WvUF32RvACsBYusMTQ16nmA6Sd9tp",
"Cvcy56pbaDms71KyxAWFTQ24n5KbCy8KNa7zMhKGcJa6",
"A4jPLDGrkAogPgm8KqQzZQ4FoNJuKuKMCbNqgQkpT5Ci",
"8aHJfDQBmngBpFwLysD1bhj8LKh7ywbxfUxsy7VQkx22",
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
"USD1ttGY1N17NEEHLmELoaybftRBUSErhqYiQzvEmuB",
"Pf9aSicGu3g6tTUBqrRbjNsGape9HopibspX5KSbonk",
"9fzQ17bEnqJSsyHL5CodJqY2sdjZJBQCQLbgFZ1BYTWn"
]"#;
let events = crate::kb_decode_raydium_cpmm_instruction(
accounts_json,
r#""66JafaVu7KNEtTngqQyD7ETVurF3rxJ47""#,
);
assert_eq!(events.len(), 1);
match &events[0] {
crate::KbRaydiumCpmmDecodedEvent::SwapBaseOutput(event) => {
assert_eq!(
event.pool_state,
"2ErXvV1tKtG3wiHqdofDjMou7Jusdsfasvfh8HrTj5oV"
);
assert_eq!(
event.base_mint,
"Pf9aSicGu3g6tTUBqrRbjNsGape9HopibspX5KSbonk"
);
assert_eq!(
event.quote_mint,
"USD1ttGY1N17NEEHLmELoaybftRBUSErhqYiQzvEmuB"
);
assert_eq!(event.input_is_base, false);
assert_eq!(event.trade_side, "buy");
assert_eq!(event.max_amount_in_raw.is_some(), true);
assert_eq!(event.amount_out_raw.is_some(), true);
}
_ => {
panic!("expected swap base output");
}
}
}
}

View File

@@ -86,7 +86,7 @@ impl KbDexDecodeService {
};
for decoded_event in &raydium_decoded {
let persist_result = self
.persist_raydium_event(&transaction, decoded_event)
.persist_raydium_amm_v4_event(&transaction, decoded_event)
.await;
let persisted_event = match persist_result {
Ok(persisted_event) => persisted_event,
@@ -94,6 +94,16 @@ impl KbDexDecodeService {
};
persisted.push(persisted_event);
}
let raydium_cpmm_persisted_result = self
.decode_and_persist_raydium_cpmm_events(&transaction, &instructions)
.await;
let raydium_cpmm_persisted = match raydium_cpmm_persisted_result {
Ok(raydium_cpmm_persisted) => raydium_cpmm_persisted,
Err(error) => return Err(error),
};
for persisted_event in raydium_cpmm_persisted {
persisted.push(persisted_event);
}
let pump_fun_decoded_result = self
.pump_fun_decoder
.decode_transaction(&transaction, &instructions);
@@ -1414,7 +1424,7 @@ impl KbDexDecodeService {
}
}
async fn persist_raydium_event(
async fn persist_raydium_amm_v4_event(
&self,
transaction: &crate::KbChainTransactionDto,
decoded_event: &crate::KbRaydiumAmmV4DecodedEvent,
@@ -1517,6 +1527,168 @@ impl KbDexDecodeService {
}
}
async fn decode_and_persist_raydium_cpmm_events(
&self,
transaction: &crate::KbChainTransactionDto,
instructions: &[crate::KbChainInstructionDto],
) -> Result<std::vec::Vec<crate::KbDexDecodedEventDto>, crate::KbError> {
let mut persisted = std::vec::Vec::new();
for instruction in instructions {
let program_id = match instruction.program_id.as_ref() {
Some(program_id) => program_id,
None => continue,
};
if program_id.as_str() != crate::KB_RAYDIUM_CPMM_PROGRAM_ID {
continue;
}
let data_json = match instruction.data_json.as_ref() {
Some(data_json) => data_json,
None => continue,
};
let decoded_events = crate::kb_decode_raydium_cpmm_instruction(
instruction.accounts_json.as_str(),
data_json.as_str(),
);
for decoded_event in &decoded_events {
let persist_result = self
.persist_raydium_cpmm_event(transaction, instruction, decoded_event)
.await;
let persisted_event = match persist_result {
Ok(persisted_event) => persisted_event,
Err(error) => return Err(error),
};
persisted.push(persisted_event);
}
}
Ok(persisted)
}
async fn persist_raydium_cpmm_event(
&self,
transaction: &crate::KbChainTransactionDto,
instruction: &crate::KbChainInstructionDto,
decoded_event: &crate::KbRaydiumCpmmDecodedEvent,
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
let transaction_id = match transaction.id {
Some(transaction_id) => transaction_id,
None => {
return Err(crate::KbError::InvalidState(format!(
"transaction '{}' has no internal id",
transaction.signature
)));
}
};
let instruction_id = match instruction.id {
Some(instruction_id) => instruction_id,
None => {
return Err(crate::KbError::InvalidState(format!(
"raydium cpmm instruction for transaction '{}' has no internal id",
transaction.signature
)));
}
};
let payload_json = match decoded_event.to_payload_json() {
Some(payload_json) => payload_json,
None => {
return Err(crate::KbError::Json(
"cannot serialize decoded raydium cpmm payload".to_string(),
));
}
};
let event_kind = decoded_event.event_kind().to_string();
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
transaction_id,
Some(instruction_id),
event_kind.as_str(),
)
.await;
let existing_option = match existing_result {
Ok(existing_option) => existing_option,
Err(error) => return Err(error),
};
let already_present = existing_option.is_some();
let dto = crate::KbDexDecodedEventDto::new(
transaction_id,
Some(instruction_id),
"raydium_cpmm".to_string(),
crate::KB_RAYDIUM_CPMM_PROGRAM_ID.to_string(),
event_kind.clone(),
Some(decoded_event.pool_account().to_string()),
None,
Some(decoded_event.base_mint().to_string()),
Some(decoded_event.quote_mint().to_string()),
None,
payload_json.clone(),
);
let upsert_result = crate::upsert_dex_decoded_event(self.database.as_ref(), &dto).await;
if let Err(error) = upsert_result {
return Err(error);
}
let fetched_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
transaction_id,
Some(instruction_id),
event_kind.as_str(),
)
.await;
let fetched_option = match fetched_result {
Ok(fetched_option) => fetched_option,
Err(error) => return Err(error),
};
let fetched = match fetched_option {
Some(fetched) => fetched,
None => {
return Err(crate::KbError::InvalidState(
"decoded raydium cpmm event disappeared after upsert".to_string(),
));
}
};
if !already_present {
let payload_value_result =
serde_json::from_str::<serde_json::Value>(payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot parse raydium cpmm payload after serialization: {}",
error
)));
}
};
let observation_result = self
.persistence
.record_observation(&crate::KbDetectionObservationInput::new(
format!("dex.{}", event_kind),
crate::KbObservationSourceKind::HttpRpc,
transaction.source_endpoint_name.clone(),
transaction.signature.clone(),
transaction.slot,
payload_value.clone(),
))
.await;
let observation_id = match observation_result {
Ok(observation_id) => observation_id,
Err(error) => return Err(error),
};
let signal_result = self
.persistence
.record_signal(&crate::KbDetectionSignalInput::new(
format!("signal.dex.{}", event_kind),
crate::KbAnalysisSignalSeverity::Low,
transaction.signature.clone(),
Some(observation_id),
None,
payload_value,
))
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
Ok(fetched)
}
async fn persist_pump_fun_event(
&self,
transaction: &crate::KbChainTransactionDto,

View File

@@ -93,6 +93,19 @@ impl KbDexDetectService {
};
detection_results.push(detect_result);
}
if decoded_event.protocol_name == "raydium_cpmm"
&& (decoded_event.event_kind == "raydium_cpmm.swap_base_input"
|| decoded_event.event_kind == "raydium_cpmm.swap_base_output")
{
let detect_result = self
.detect_raydium_cpmm_trade(&transaction, decoded_event)
.await;
let detect_result = match detect_result {
Ok(detect_result) => detect_result,
Err(error) => return Err(error),
};
detection_results.push(detect_result);
}
if decoded_event.protocol_name == "pump_fun"
&& decoded_event.event_kind == "pump_fun.create_v2_token"
{
@@ -2735,6 +2748,238 @@ impl KbDexDetectService {
})
}
async fn detect_raydium_cpmm_trade(
&self,
transaction: &crate::KbChainTransactionDto,
decoded_event: &crate::KbDexDecodedEventDto,
) -> Result<crate::KbDexPoolDetectionResult, crate::KbError> {
let decoded_event_id = match decoded_event.id {
Some(decoded_event_id) => decoded_event_id,
None => {
return Err(crate::KbError::InvalidState(
"decoded dex event has no internal id".to_string(),
));
}
};
let dex_id_result = self.ensure_raydium_cpmm_dex().await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let pool_address = match decoded_event.pool_account.clone() {
Some(pool_address) => pool_address,
None => {
return Err(crate::KbError::InvalidState(format!(
"decoded event '{}' has no pool_account",
decoded_event_id
)));
}
};
let base_mint = match decoded_event.token_a_mint.clone() {
Some(base_mint) => base_mint,
None => {
return Err(crate::KbError::InvalidState(format!(
"decoded event '{}' has no token_a_mint",
decoded_event_id
)));
}
};
let quote_mint = match decoded_event.token_b_mint.clone() {
Some(quote_mint) => quote_mint,
None => {
return Err(crate::KbError::InvalidState(format!(
"decoded event '{}' has no token_b_mint",
decoded_event_id
)));
}
};
let payload_value_result = kb_parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
let base_vault_address = kb_extract_payload_string_field(&payload_value, "base_vault");
let quote_vault_address = kb_extract_payload_string_field(&payload_value, "quote_vault");
let base_token_id_result = self.ensure_token(base_mint.as_str()).await;
let base_token_id = match base_token_id_result {
Ok(base_token_id) => base_token_id,
Err(error) => return Err(error),
};
let quote_token_id_result = self.ensure_token(quote_mint.as_str()).await;
let quote_token_id = match quote_token_id_result {
Ok(quote_token_id) => quote_token_id,
Err(error) => return Err(error),
};
let existing_pool_result =
crate::get_pool_by_address(self.database.as_ref(), pool_address.as_str()).await;
let existing_pool_option = match existing_pool_result {
Ok(existing_pool_option) => existing_pool_option,
Err(error) => return Err(error),
};
let created_pool = existing_pool_option.is_none();
let pool_id = match existing_pool_option {
Some(pool) => match pool.id {
Some(pool_id) => pool_id,
None => {
return Err(crate::KbError::InvalidState(format!(
"pool '{}' has no internal id",
pool.address
)));
}
},
None => {
let pool_dto = crate::KbPoolDto::new(
dex_id,
pool_address.clone(),
crate::KbPoolKind::Amm,
crate::KbPoolStatus::Active,
);
let upsert_result = crate::upsert_pool(self.database.as_ref(), &pool_dto).await;
match upsert_result {
Ok(pool_id) => pool_id,
Err(error) => return Err(error),
}
}
};
let existing_pair_result =
crate::get_pair_by_pool_id(self.database.as_ref(), pool_id).await;
let existing_pair_option = match existing_pair_result {
Ok(existing_pair_option) => existing_pair_option,
Err(error) => return Err(error),
};
let created_pair = existing_pair_option.is_none();
let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str());
let pair_dto =
crate::KbPairDto::new(dex_id, pool_id, base_token_id, quote_token_id, pair_symbol);
let pair_id_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await;
let pair_id = match pair_id_result {
Ok(pair_id) => pair_id,
Err(error) => return Err(error),
};
let upsert_base_pool_token_result = crate::upsert_pool_token(
self.database.as_ref(),
&crate::KbPoolTokenDto::new(
pool_id,
base_token_id,
crate::KbPoolTokenRole::Base,
base_vault_address,
Some(0),
),
)
.await;
if let Err(error) = upsert_base_pool_token_result {
return Err(error);
}
let upsert_quote_pool_token_result = crate::upsert_pool_token(
self.database.as_ref(),
&crate::KbPoolTokenDto::new(
pool_id,
quote_token_id,
crate::KbPoolTokenRole::Quote,
quote_vault_address,
Some(1),
),
)
.await;
if let Err(error) = upsert_quote_pool_token_result {
return Err(error);
}
let existing_listing_result =
crate::get_pool_listing_by_pool_id(self.database.as_ref(), pool_id).await;
let existing_listing_option = match existing_listing_result {
Ok(existing_listing_option) => existing_listing_option,
Err(error) => return Err(error),
};
let created_listing = existing_listing_option.is_none();
let pool_listing_id = match existing_listing_option {
Some(pool_listing) => pool_listing.id,
None => {
let listing_id_result = self
.upsert_pool_listing_from_decoded_event(dex_id, pool_id, pair_id, transaction)
.await;
match listing_id_result {
Ok(listing_id) => Some(listing_id),
Err(error) => return Err(error),
}
}
};
if created_pool {
let signal_result = self
.record_detection_signal(
transaction,
"signal.dex.raydium_cpmm.new_pool",
crate::KbAnalysisSignalSeverity::Low,
payload_value.clone(),
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
if created_pair {
let signal_result = self
.record_detection_signal(
transaction,
"signal.dex.raydium_cpmm.new_pair",
crate::KbAnalysisSignalSeverity::Low,
payload_value.clone(),
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
if created_listing {
let signal_result = self
.record_detection_signal(
transaction,
"signal.dex.raydium_cpmm.first_listing_seen",
crate::KbAnalysisSignalSeverity::Low,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
Ok(crate::KbDexPoolDetectionResult {
decoded_event_id,
dex_id,
pool_id,
pair_id,
pool_listing_id,
created_pool,
created_pair,
created_listing,
})
}
async fn ensure_raydium_cpmm_dex(&self) -> Result<i64, crate::KbError> {
let dex_result = crate::get_dex_by_code(self.database.as_ref(), "raydium_cpmm").await;
let dex_option = match dex_result {
Ok(dex_option) => dex_option,
Err(error) => return Err(error),
};
match dex_option {
Some(dex) => match dex.id {
Some(dex_id) => Ok(dex_id),
None => Err(crate::KbError::InvalidState(
"raydium_cpmm dex has no internal id".to_string(),
)),
},
None => {
let dex_dto = crate::KbDexDto::new(
"raydium_cpmm".to_string(),
"Raydium CPMM".to_string(),
Some(crate::KB_RAYDIUM_CPMM_PROGRAM_ID.to_string()),
None,
true,
);
crate::upsert_dex(self.database.as_ref(), &dex_dto).await
}
}
}
async fn ensure_dexlab_dex(&self) -> Result<i64, crate::KbError> {
let dex_result = crate::get_dex_by_code(self.database.as_ref(), "dexlab").await;
let dex_option = match dex_result {
@@ -3160,6 +3405,21 @@ fn kb_extract_pump_swap_vault_addresses(
(token_a_vault_address, token_b_vault_address)
}
fn kb_extract_payload_string_field(
payload_value: &serde_json::Value,
field_name: &str,
) -> std::option::Option<std::string::String> {
let value_option = payload_value.get(field_name);
let value = match value_option {
Some(value) => value,
None => return None,
};
match value.as_str() {
Some(value) => Some(value.to_string()),
None => None,
}
}
#[cfg(test)]
mod tests {
async fn make_database() -> std::sync::Arc<crate::KbDatabase> {

View File

@@ -49,6 +49,82 @@ pub use config::KbSolanaConfig;
pub use config::KbSqliteDatabaseConfig;
pub use config::KbWsEndpointConfig;
pub use constants::*;
pub use db::KbAnalysisSignalDto;
pub use db::KbAnalysisSignalEntity;
pub use db::KbAnalysisSignalSeverity;
pub use db::KbChainInstructionDto;
pub use db::KbChainInstructionEntity;
pub use db::KbChainSlotDto;
pub use db::KbChainSlotEntity;
pub use db::KbChainTransactionDto;
pub use db::KbChainTransactionEntity;
pub use db::KbDatabase;
pub use db::KbDatabaseBackend;
pub use db::KbDatabaseConnection;
pub use db::KbDbMetadataDto;
pub use db::KbDbMetadataEntity;
pub use db::KbDbRuntimeEventDto;
pub use db::KbDbRuntimeEventEntity;
pub use db::KbDbRuntimeEventLevel;
pub use db::KbDexDecodedEventDto;
pub use db::KbDexDecodedEventEntity;
pub use db::KbDexDto;
pub use db::KbDexEntity;
pub use db::KbKnownHttpEndpointDto;
pub use db::KbKnownHttpEndpointEntity;
pub use db::KbKnownWsEndpointDto;
pub use db::KbKnownWsEndpointEntity;
pub use db::KbLaunchAttributionDto;
pub use db::KbLaunchAttributionEntity;
pub use db::KbLaunchSurfaceDto;
pub use db::KbLaunchSurfaceEntity;
pub use db::KbLaunchSurfaceKeyDto;
pub use db::KbLaunchSurfaceKeyEntity;
pub use db::KbLiquidityEventDto;
pub use db::KbLiquidityEventEntity;
pub use db::KbLiquidityEventKind;
pub use db::KbObservationSourceKind;
pub use db::KbObservedTokenDto;
pub use db::KbObservedTokenEntity;
pub use db::KbObservedTokenStatus;
pub use db::KbOnchainObservationDto;
pub use db::KbOnchainObservationEntity;
pub use db::KbPairAnalyticSignalDto;
pub use db::KbPairAnalyticSignalEntity;
pub use db::KbPairCandleDto;
pub use db::KbPairCandleEntity;
pub use db::KbPairDto;
pub use db::KbPairEntity;
pub use db::KbPairMetricDto;
pub use db::KbPairMetricEntity;
pub use db::KbPoolDto;
pub use db::KbPoolEntity;
pub use db::KbPoolKind;
pub use db::KbPoolListingDto;
pub use db::KbPoolListingEntity;
pub use db::KbPoolOriginDto;
pub use db::KbPoolOriginEntity;
pub use db::KbPoolStatus;
pub use db::KbPoolTokenDto;
pub use db::KbPoolTokenEntity;
pub use db::KbPoolTokenRole;
pub use db::KbSwapDto;
pub use db::KbSwapEntity;
pub use db::KbSwapTradeSide;
pub use db::KbTokenBurnEventDto;
pub use db::KbTokenBurnEventEntity;
pub use db::KbTokenDto;
pub use db::KbTokenEntity;
pub use db::KbTokenMintEventDto;
pub use db::KbTokenMintEventEntity;
pub use db::KbTradeEventDto;
pub use db::KbTradeEventEntity;
pub use db::KbWalletDto;
pub use db::KbWalletEntity;
pub use db::KbWalletHoldingDto;
pub use db::KbWalletHoldingEntity;
pub use db::KbWalletParticipationDto;
pub use db::KbWalletParticipationEntity;
pub use db::delete_chain_instructions_by_transaction_id;
pub use db::get_chain_slot;
pub use db::get_chain_transaction_by_signature;
@@ -139,82 +215,6 @@ pub use db::upsert_trade_event;
pub use db::upsert_wallet;
pub use db::upsert_wallet_holding;
pub use db::upsert_wallet_participation;
pub use db::KbAnalysisSignalDto;
pub use db::KbAnalysisSignalEntity;
pub use db::KbAnalysisSignalSeverity;
pub use db::KbChainInstructionDto;
pub use db::KbChainInstructionEntity;
pub use db::KbChainSlotDto;
pub use db::KbChainSlotEntity;
pub use db::KbChainTransactionDto;
pub use db::KbChainTransactionEntity;
pub use db::KbDatabase;
pub use db::KbDatabaseBackend;
pub use db::KbDatabaseConnection;
pub use db::KbDbMetadataDto;
pub use db::KbDbMetadataEntity;
pub use db::KbDbRuntimeEventDto;
pub use db::KbDbRuntimeEventEntity;
pub use db::KbDbRuntimeEventLevel;
pub use db::KbDexDecodedEventDto;
pub use db::KbDexDecodedEventEntity;
pub use db::KbDexDto;
pub use db::KbDexEntity;
pub use db::KbKnownHttpEndpointDto;
pub use db::KbKnownHttpEndpointEntity;
pub use db::KbKnownWsEndpointDto;
pub use db::KbKnownWsEndpointEntity;
pub use db::KbLaunchAttributionDto;
pub use db::KbLaunchAttributionEntity;
pub use db::KbLaunchSurfaceDto;
pub use db::KbLaunchSurfaceEntity;
pub use db::KbLaunchSurfaceKeyDto;
pub use db::KbLaunchSurfaceKeyEntity;
pub use db::KbLiquidityEventDto;
pub use db::KbLiquidityEventEntity;
pub use db::KbLiquidityEventKind;
pub use db::KbObservationSourceKind;
pub use db::KbObservedTokenDto;
pub use db::KbObservedTokenEntity;
pub use db::KbObservedTokenStatus;
pub use db::KbOnchainObservationDto;
pub use db::KbOnchainObservationEntity;
pub use db::KbPairAnalyticSignalDto;
pub use db::KbPairAnalyticSignalEntity;
pub use db::KbPairCandleDto;
pub use db::KbPairCandleEntity;
pub use db::KbPairDto;
pub use db::KbPairEntity;
pub use db::KbPairMetricDto;
pub use db::KbPairMetricEntity;
pub use db::KbPoolDto;
pub use db::KbPoolEntity;
pub use db::KbPoolKind;
pub use db::KbPoolListingDto;
pub use db::KbPoolListingEntity;
pub use db::KbPoolOriginDto;
pub use db::KbPoolOriginEntity;
pub use db::KbPoolStatus;
pub use db::KbPoolTokenDto;
pub use db::KbPoolTokenEntity;
pub use db::KbPoolTokenRole;
pub use db::KbSwapDto;
pub use db::KbSwapEntity;
pub use db::KbSwapTradeSide;
pub use db::KbTokenBurnEventDto;
pub use db::KbTokenBurnEventEntity;
pub use db::KbTokenDto;
pub use db::KbTokenEntity;
pub use db::KbTokenMintEventDto;
pub use db::KbTokenMintEventEntity;
pub use db::KbTradeEventDto;
pub use db::KbTradeEventEntity;
pub use db::KbWalletDto;
pub use db::KbWalletEntity;
pub use db::KbWalletHoldingDto;
pub use db::KbWalletHoldingEntity;
pub use db::KbWalletParticipationDto;
pub use db::KbWalletParticipationEntity;
pub use detect::KbDetectionObservationInput;
pub use detect::KbDetectionPersistenceService;
pub use detect::KbDetectionPoolCandidateInput;
@@ -227,6 +227,16 @@ pub use detect::KbSolanaWsDetectionService;
pub use detect::KbWsDetectionNotificationEnvelope;
pub use detect::KbWsDetectionRelay;
pub use detect::KbWsDetectionRelayStats;
pub use dex::KB_DEXLAB_PROGRAM_ID;
pub use dex::KB_FLUXBEAM_PROGRAM_ID;
pub use dex::KB_METEORA_DAMM_V1_PROGRAM_ID;
pub use dex::KB_METEORA_DAMM_V2_PROGRAM_ID;
pub use dex::KB_METEORA_DBC_PROGRAM_ID;
pub use dex::KB_ORCA_WHIRLPOOLS_PROGRAM_ID;
pub use dex::KB_PUMP_FUN_PROGRAM_ID;
pub use dex::KB_PUMP_SWAP_PROGRAM_ID;
pub use dex::KB_RAYDIUM_AMM_V4_PROGRAM_ID;
pub use dex::KB_RAYDIUM_CPMM_PROGRAM_ID;
pub use dex::KbDexlabCreatePoolDecoded;
pub use dex::KbDexlabDecodedEvent;
pub use dex::KbDexlabDecoder;
@@ -261,21 +271,14 @@ pub use dex::KbPumpSwapTradeDecoded;
pub use dex::KbRaydiumAmmV4DecodedEvent;
pub use dex::KbRaydiumAmmV4Decoder;
pub use dex::KbRaydiumAmmV4Initialize2PoolDecoded;
pub use dex::KB_DEXLAB_PROGRAM_ID;
pub use dex::KB_FLUXBEAM_PROGRAM_ID;
pub use dex::KB_METEORA_DAMM_V1_PROGRAM_ID;
pub use dex::KB_METEORA_DAMM_V2_PROGRAM_ID;
pub use dex::KB_METEORA_DBC_PROGRAM_ID;
pub use dex::KB_ORCA_WHIRLPOOLS_PROGRAM_ID;
pub use dex::KB_PUMP_FUN_PROGRAM_ID;
pub use dex::KB_PUMP_SWAP_PROGRAM_ID;
pub use dex::KB_RAYDIUM_AMM_V4_PROGRAM_ID;
pub use dex::KbRaydiumCpmmDecodedEvent;
pub use dex::KbRaydiumCpmmSwapDecoded;
pub use dex::KbRaydiumCpmmSwapMode;
pub use dex::kb_decode_raydium_cpmm_instruction;
pub use dex_decode::KbDexDecodeService;
pub use dex_detect::KbDexDetectService;
pub use dex_detect::KbDexPoolDetectionResult;
pub use error::KbError;
pub use http_client::parse_kb_json_rpc_http_response_text;
pub use http_client::parse_kb_json_rpc_http_response_value;
pub use http_client::HttpClient;
pub use http_client::KbHttpEndpointStatus;
pub use http_client::KbHttpMethodClass;
@@ -284,11 +287,10 @@ pub use http_client::KbJsonRpcHttpErrorResponse;
pub use http_client::KbJsonRpcHttpRequest;
pub use http_client::KbJsonRpcHttpResponse;
pub use http_client::KbJsonRpcHttpSuccessResponse;
pub use http_client::parse_kb_json_rpc_http_response_text;
pub use http_client::parse_kb_json_rpc_http_response_value;
pub use http_pool::HttpEndpointPool;
pub use http_pool::KbHttpPoolClientSnapshot;
pub use json_rpc_ws::kb_is_probable_json_rpc_object_text;
pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_text;
pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_value;
pub use json_rpc_ws::KbJsonRpcWsErrorObject;
pub use json_rpc_ws::KbJsonRpcWsErrorResponse;
pub use json_rpc_ws::KbJsonRpcWsIncomingMessage;
@@ -296,6 +298,9 @@ pub use json_rpc_ws::KbJsonRpcWsNotification;
pub use json_rpc_ws::KbJsonRpcWsNotificationParams;
pub use json_rpc_ws::KbJsonRpcWsRequest;
pub use json_rpc_ws::KbJsonRpcWsSuccessResponse;
pub use json_rpc_ws::kb_is_probable_json_rpc_object_text;
pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_text;
pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_value;
pub use launch_origin::KbLaunchAttributionResult;
pub use launch_origin::KbLaunchOriginService;
pub use pair_analytic_signal::KbPairAnalyticSignalResult;
@@ -305,14 +310,14 @@ pub use pair_candle_aggregation::KbPairCandleAggregationService;
pub use pair_candle_query::KbPairCandleQueryService;
pub use pool_origin::KbPoolOriginResult;
pub use pool_origin::KbPoolOriginService;
pub use solana_pubsub_ws::KbSolanaWsTypedNotification;
pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification;
pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event;
pub use solana_pubsub_ws::KbSolanaWsTypedNotification;
pub use token_backfill::KbPoolBackfillResult;
pub use token_backfill::KbTokenBackfillResult;
pub use token_backfill::KbTokenBackfillService;
pub use tracing::init_tracing;
pub use tracing::KbTracingGuard;
pub use tracing::init_tracing;
pub use trade_aggregation::KbTradeAggregationResult;
pub use trade_aggregation::KbTradeAggregationService;
pub use tx_model::KbTransactionModelService;

View File

@@ -183,7 +183,7 @@ impl KbTradeAggregationService {
|| quote_amount_raw.is_none()
|| price_quote_per_base.is_none())
{
let inferred_result = kb_extract_pump_swap_amounts_from_transaction(
let inferred_result = kb_extract_trade_amounts_from_vault_balance_deltas(
transaction.transaction_json.as_str(),
transaction.meta_json.as_deref(),
base_vault_address.as_deref(),
@@ -228,6 +228,31 @@ impl KbTradeAggregationService {
price_quote_per_base = inferred.2;
}
}
if decoded_event.event_kind.starts_with("raydium_cpmm.")
&& (base_amount_raw.is_none()
|| quote_amount_raw.is_none()
|| price_quote_per_base.is_none())
{
let inferred_result = kb_extract_trade_amounts_from_vault_balance_deltas(
transaction.transaction_json.as_str(),
transaction.meta_json.as_deref(),
base_vault_address.as_deref(),
quote_vault_address.as_deref(),
);
let inferred = match inferred_result {
Ok(inferred) => inferred,
Err(error) => return Err(error),
};
if base_amount_raw.is_none() {
base_amount_raw = inferred.0;
}
if quote_amount_raw.is_none() {
quote_amount_raw = inferred.1;
}
if price_quote_per_base.is_none() {
price_quote_per_base = inferred.2;
}
}
if price_quote_per_base.is_none() {
price_quote_per_base = kb_compute_price_quote_per_base_with_decimals(
transaction.meta_json.as_deref(),
@@ -389,6 +414,12 @@ fn kb_is_trade_event_kind(event_kind: &str) -> bool {
if event_kind.ends_with(".sell") {
return true;
}
if event_kind == "raydium_cpmm.swap_base_input" {
return true;
}
if event_kind == "raydium_cpmm.swap_base_output" {
return true;
}
false
}
@@ -403,10 +434,15 @@ fn kb_convert_slot_to_i64(slot: std::option::Option<u64>) -> std::option::Option
}
fn kb_extract_trade_side(event_kind: &str, payload: &serde_json::Value) -> crate::KbSwapTradeSide {
let trade_side_option = kb_extract_string_by_candidate_keys(payload, &["tradeSide"]);
let trade_side_option =
kb_extract_string_by_candidate_keys(payload, &["tradeSide", "trade_side"]);
match trade_side_option.as_deref() {
Some("BuyBase") => return crate::KbSwapTradeSide::BuyBase,
Some("buy") => return crate::KbSwapTradeSide::BuyBase,
Some("BUY") => return crate::KbSwapTradeSide::BuyBase,
Some("SellBase") => return crate::KbSwapTradeSide::SellBase,
Some("sell") => return crate::KbSwapTradeSide::SellBase,
Some("SELL") => return crate::KbSwapTradeSide::SellBase,
_ => {}
}
if event_kind.ends_with(".buy") {
@@ -580,7 +616,7 @@ fn kb_find_pool_token_vault_address_by_token_id(
None
}
fn kb_extract_pump_swap_amounts_from_transaction(
fn kb_extract_trade_amounts_from_vault_balance_deltas(
transaction_json: &str,
meta_json: std::option::Option<&str>,
base_vault_address: std::option::Option<&str>,
@@ -1000,7 +1036,7 @@ fn kb_compute_price_quote_per_base_with_decimals(
base_vault_address: std::option::Option<&str>,
quote_vault_address: std::option::Option<&str>,
) -> std::option::Option<f64> {
let inferred_result = kb_extract_pump_swap_amounts_from_transaction(
let inferred_result = kb_extract_trade_amounts_from_vault_balance_deltas(
transaction_json,
meta_json,
base_vault_address,