This commit is contained in:
2026-05-03 18:05:32 +02:00
parent 29ebf6b123
commit 3e994995d7
8 changed files with 1765 additions and 145 deletions

View File

@@ -11,6 +11,7 @@ mod orca_whirlpools;
mod pump_fun;
mod pump_swap;
mod raydium_amm_v4;
mod raydium_clmm;
mod raydium_cpmm;
pub use dexlab::KB_DEXLAB_PROGRAM_ID;
@@ -56,6 +57,10 @@ pub use raydium_amm_v4::KB_RAYDIUM_AMM_V4_PROGRAM_ID;
pub use raydium_amm_v4::KbRaydiumAmmV4DecodedEvent;
pub use raydium_amm_v4::KbRaydiumAmmV4Decoder;
pub use raydium_amm_v4::KbRaydiumAmmV4Initialize2PoolDecoded;
pub use raydium_clmm::KB_RAYDIUM_CLMM_PROGRAM_ID;
pub use raydium_clmm::KbRaydiumClmmDecodedEvent;
pub use raydium_clmm::KbRaydiumClmmSwapV2Decoded;
pub use raydium_clmm::kb_decode_raydium_clmm_instruction;
pub use raydium_cpmm::KB_RAYDIUM_CPMM_PROGRAM_ID;
pub use raydium_cpmm::KbRaydiumCpmmDecodedEvent;
pub use raydium_cpmm::KbRaydiumCpmmSwapDecoded;

View File

@@ -0,0 +1,500 @@
// file: kb_lib/src/dex/raydium_clmm.rs
//! Raydium CLMM instruction decoder.
/// Raydium CLMM program id.
pub const KB_RAYDIUM_CLMM_PROGRAM_ID: &str = "CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK";
const KB_RAYDIUM_CLMM_SWAP_V2_DISCRIMINATOR: [u8; 8] = [43, 4, 237, 11, 26, 201, 30, 98];
const KB_RAYDIUM_CLMM_SWAP_LEGACY_DISCRIMINATOR: [u8; 8] = [248, 198, 158, 145, 225, 117, 135, 200];
/// Decoded Raydium CLMM event.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
pub enum KbRaydiumClmmDecodedEvent {
/// Raydium CLMM swap_v2 event.
SwapV2(crate::KbRaydiumClmmSwapV2Decoded),
}
impl KbRaydiumClmmDecodedEvent {
/// Returns the normalized event kind.
pub fn event_kind(&self) -> &'static str {
match self {
crate::KbRaydiumClmmDecodedEvent::SwapV2(_) => "raydium_clmm.swap_v2",
}
}
/// Returns the pool account.
pub fn pool_account(&self) -> &str {
match self {
crate::KbRaydiumClmmDecodedEvent::SwapV2(event) => event.pool_state.as_str(),
}
}
/// Returns the normalized base mint.
pub fn base_mint(&self) -> &str {
match self {
crate::KbRaydiumClmmDecodedEvent::SwapV2(event) => event.base_mint.as_str(),
}
}
/// Returns the normalized quote mint.
pub fn quote_mint(&self) -> &str {
match self {
crate::KbRaydiumClmmDecodedEvent::SwapV2(event) => event.quote_mint.as_str(),
}
}
/// Converts the decoded event to JSON payload.
pub fn to_payload_json(&self) -> std::option::Option<std::string::String> {
match self {
crate::KbRaydiumClmmDecodedEvent::SwapV2(event) => {
let result = serde_json::to_string(event);
match result {
Ok(payload_json) => Some(payload_json),
Err(_) => None,
}
}
}
}
}
/// Decoded Raydium CLMM swap_v2 instruction.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
pub struct KbRaydiumClmmSwapV2Decoded {
/// User performing the swap.
pub payer: std::string::String,
/// AMM config account.
pub amm_config: std::string::String,
/// CLMM pool state account.
pub pool_state: std::string::String,
/// User input token account.
pub input_token_account: std::string::String,
/// User output token account.
pub output_token_account: std::string::String,
/// Pool input vault.
pub input_vault: std::string::String,
/// Pool output vault.
pub output_vault: std::string::String,
/// Pool oracle observation state.
pub observation_state: std::string::String,
/// Input vault mint.
pub input_vault_mint: std::string::String,
/// Output vault mint.
pub output_vault_mint: std::string::String,
/// Canonical base mint.
pub base_mint: std::string::String,
/// Canonical quote mint.
pub quote_mint: std::string::String,
/// Canonical base vault.
pub base_vault: std::string::String,
/// Canonical quote vault.
pub quote_vault: std::string::String,
/// Trade side relative to the canonical base mint.
#[serde(rename = "tradeSide")]
pub trade_side: std::string::String,
/// Amount argument.
pub amount: u64,
/// Other amount threshold argument.
pub other_amount_threshold: u64,
/// Sqrt price limit as decimal string.
pub sqrt_price_limit_x64: std::string::String,
/// Whether the instruction uses exact input mode.
pub is_base_input: bool,
}
/// Decodes a Raydium CLMM instruction.
pub fn kb_decode_raydium_clmm_instruction(
accounts_json: &str,
data_json: &str,
) -> std::vec::Vec<crate::KbRaydiumClmmDecodedEvent> {
let mut decoded = std::vec::Vec::new();
let accounts_result = serde_json::from_str::<std::vec::Vec<std::string::String>>(accounts_json);
let accounts = match accounts_result {
Ok(accounts) => accounts,
Err(_) => return decoded,
};
let data_base58_result = serde_json::from_str::<std::string::String>(data_json);
let data_base58 = match data_base58_result {
Ok(data_base58) => data_base58,
Err(_) => data_json.to_string(),
};
let data_option = kb_decode_base58(data_base58.as_str());
let data = match data_option {
Some(data) => data,
None => return decoded,
};
if data.len() < 41 {
return decoded;
}
let discriminator_option = kb_read_discriminator(data.as_slice());
let discriminator = match discriminator_option {
Some(discriminator) => discriminator,
None => return decoded,
};
if discriminator == KB_RAYDIUM_CLMM_SWAP_LEGACY_DISCRIMINATOR {
return decoded;
}
if discriminator != KB_RAYDIUM_CLMM_SWAP_V2_DISCRIMINATOR {
return decoded;
}
let event_option = kb_decode_swap_v2(accounts.as_slice(), data.as_slice());
let event = match event_option {
Some(event) => event,
None => return decoded,
};
decoded.push(crate::KbRaydiumClmmDecodedEvent::SwapV2(event));
decoded
}
fn kb_decode_swap_v2(
accounts: &[std::string::String],
data: &[u8],
) -> std::option::Option<crate::KbRaydiumClmmSwapV2Decoded> {
let payer = match kb_clone_account(accounts, 0) {
Some(value) => value,
None => return None,
};
let amm_config = match kb_clone_account(accounts, 1) {
Some(value) => value,
None => return None,
};
let pool_state = match kb_clone_account(accounts, 2) {
Some(value) => value,
None => return None,
};
let input_token_account = match kb_clone_account(accounts, 3) {
Some(value) => value,
None => return None,
};
let output_token_account = match kb_clone_account(accounts, 4) {
Some(value) => value,
None => return None,
};
let input_vault = match kb_clone_account(accounts, 5) {
Some(value) => value,
None => return None,
};
let output_vault = match kb_clone_account(accounts, 6) {
Some(value) => value,
None => return None,
};
let observation_state = match kb_clone_account(accounts, 7) {
Some(value) => value,
None => return None,
};
let input_vault_mint = match kb_clone_account(accounts, 11) {
Some(value) => value,
None => return None,
};
let output_vault_mint = match kb_clone_account(accounts, 12) {
Some(value) => value,
None => return None,
};
let amount = match kb_read_u64_le(data, 8) {
Some(value) => value,
None => return None,
};
let other_amount_threshold = match kb_read_u64_le(data, 16) {
Some(value) => value,
None => return None,
};
let sqrt_price_limit_x64 = match kb_read_u128_le(data, 24) {
Some(value) => value,
None => return None,
};
let is_base_input = match kb_read_bool(data, 40) {
Some(value) => value,
None => return None,
};
let mut base_mint = input_vault_mint.clone();
let mut quote_mint = output_vault_mint.clone();
let mut base_vault = input_vault.clone();
let mut quote_vault = output_vault.clone();
let mut trade_side = "SellBase".to_string();
if output_vault_mint.as_str() < input_vault_mint.as_str() {
base_mint = output_vault_mint.clone();
quote_mint = input_vault_mint.clone();
base_vault = output_vault.clone();
quote_vault = input_vault.clone();
trade_side = "BuyBase".to_string();
}
Some(crate::KbRaydiumClmmSwapV2Decoded {
payer,
amm_config,
pool_state,
input_token_account,
output_token_account,
input_vault,
output_vault,
observation_state,
input_vault_mint,
output_vault_mint,
base_mint,
quote_mint,
base_vault,
quote_vault,
trade_side,
amount,
other_amount_threshold,
sqrt_price_limit_x64: sqrt_price_limit_x64.to_string(),
is_base_input,
})
}
fn kb_clone_account(
accounts: &[std::string::String],
index: usize,
) -> std::option::Option<std::string::String> {
let account_option = accounts.get(index);
match account_option {
Some(account) => Some(account.clone()),
None => None,
}
}
fn kb_read_discriminator(data: &[u8]) -> std::option::Option<[u8; 8]> {
if data.len() < 8 {
return None;
}
let mut bytes = [0_u8; 8];
let mut index = 0_usize;
while index < 8 {
bytes[index] = data[index];
index += 1;
}
Some(bytes)
}
fn kb_read_u64_le(data: &[u8], offset: usize) -> std::option::Option<u64> {
if data.len() < offset + 8 {
return None;
}
let mut bytes = [0_u8; 8];
let mut index = 0_usize;
while index < 8 {
bytes[index] = data[offset + index];
index += 1;
}
Some(u64::from_le_bytes(bytes))
}
fn kb_read_u128_le(data: &[u8], offset: usize) -> std::option::Option<u128> {
if data.len() < offset + 16 {
return None;
}
let mut bytes = [0_u8; 16];
let mut index = 0_usize;
while index < 16 {
bytes[index] = data[offset + index];
index += 1;
}
Some(u128::from_le_bytes(bytes))
}
fn kb_read_bool(data: &[u8], offset: usize) -> std::option::Option<bool> {
if data.len() <= offset {
return None;
}
match data[offset] {
0 => Some(false),
1 => Some(true),
_ => None,
}
}
fn kb_decode_base58(input: &str) -> std::option::Option<std::vec::Vec<u8>> {
let alphabet = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz".as_bytes();
let mut bytes: std::vec::Vec<u8> = std::vec::Vec::new();
for input_byte in input.bytes() {
let mut value_option = None;
let mut alphabet_index = 0_usize;
while alphabet_index < alphabet.len() {
if alphabet[alphabet_index] == input_byte {
value_option = Some(alphabet_index as u32);
break;
}
alphabet_index += 1;
}
let mut carry = match value_option {
Some(value) => value,
None => return None,
};
let mut byte_index = bytes.len();
while byte_index > 0 {
byte_index -= 1;
let value = (bytes[byte_index] as u32) * 58 + carry;
bytes[byte_index] = (value & 0xff) as u8;
carry = value >> 8;
}
while carry > 0 {
bytes.insert(0, (carry & 0xff) as u8);
carry >>= 8;
}
}
let mut leading_zero_count = 0_usize;
for input_byte in input.bytes() {
if input_byte == b'1' {
leading_zero_count += 1;
} else {
break;
}
}
let mut result = std::vec::Vec::new();
let mut index = 0_usize;
while index < leading_zero_count {
result.push(0_u8);
index += 1;
}
for byte in bytes {
result.push(byte);
}
Some(result)
}
#[cfg(test)]
mod tests {
fn sample_swap_v2_accounts_json() -> &'static str {
r#"[
"8NQ32SyFKD1d5kenq4oM8Da6C6J9TQSMW1uAgFRveEQr",
"A1BBtTYJd4i3xU8D6Tc2FzU6ZN4oXZWXKZnCxwbHXr8x",
"GUrRxvnWVQSnbcz1eP9D5BqXwPZtRhmrqVfm5wY9meWR",
"D2frZyyQ7NQaXRiEoBUM9S64Ckv7KZ7wuqupqdMhpsHy",
"H7qe6sAyEyqztyMtRrDf5J1gugLx6yuyKPy5veVmR14W",
"AvRzvwpSVnxsinLGQS3vZLqkZxhXZDM8F2qKccAo7rSq",
"CTkc4xDrpzjWcFLC1cxmUZZjZLSRV46HZa8wu5eKTbuh",
"8QtFSxNzD3zmEX8nzQKZB83TH4WGUAkLkQoRHAw5fuhn",
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
"TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb",
"MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr",
"CKvjP8FrZpaKXjASEtX2nEU9w7M4RKskfnLQbKJBodV",
"7vfCXTUXx5WJV5JADk17DUJ4ksgau7utNKj4b963voxs",
"8ovxZR2Gv9Mr73aoXLQYTMaZvHCSpnEohzgjVHQwmyHr",
"9MssDxndh2Rn8DmGWL94hXVv22zxfDYHV7tvzfPgcaWe"
]"#
}
#[test]
fn decodes_swap_v2() {
let events = crate::kb_decode_raydium_clmm_instruction(
sample_swap_v2_accounts_json(),
r#""ASCsAbe1UnDnCsnGLPALJUXSS5JREycfhGyTzKh7xRWNyRHCqBuzR23S""#,
);
assert_eq!(events.len(), 1);
match &events[0] {
crate::KbRaydiumClmmDecodedEvent::SwapV2(event) => {
assert_eq!(events[0].event_kind(), "raydium_clmm.swap_v2");
assert_eq!(
events[0].pool_account(),
"GUrRxvnWVQSnbcz1eP9D5BqXwPZtRhmrqVfm5wY9meWR"
);
assert_eq!(
event.pool_state,
"GUrRxvnWVQSnbcz1eP9D5BqXwPZtRhmrqVfm5wY9meWR"
);
assert_eq!(
event.input_vault,
"AvRzvwpSVnxsinLGQS3vZLqkZxhXZDM8F2qKccAo7rSq"
);
assert_eq!(
event.output_vault,
"CTkc4xDrpzjWcFLC1cxmUZZjZLSRV46HZa8wu5eKTbuh"
);
assert_eq!(
event.input_vault_mint,
"CKvjP8FrZpaKXjASEtX2nEU9w7M4RKskfnLQbKJBodV"
);
assert_eq!(
event.output_vault_mint,
"7vfCXTUXx5WJV5JADk17DUJ4ksgau7utNKj4b963voxs"
);
assert_eq!(
event.base_mint,
"7vfCXTUXx5WJV5JADk17DUJ4ksgau7utNKj4b963voxs"
);
assert_eq!(
event.quote_mint,
"CKvjP8FrZpaKXjASEtX2nEU9w7M4RKskfnLQbKJBodV"
);
assert_eq!(
event.base_vault,
"CTkc4xDrpzjWcFLC1cxmUZZjZLSRV46HZa8wu5eKTbuh"
);
assert_eq!(
event.quote_vault,
"AvRzvwpSVnxsinLGQS3vZLqkZxhXZDM8F2qKccAo7rSq"
);
assert_eq!(event.trade_side, "BuyBase");
assert_eq!(event.amount, 148441657491969);
assert_eq!(event.other_amount_threshold, 0);
assert_eq!(event.sqrt_price_limit_x64, "0");
assert_eq!(event.is_base_input, true);
}
}
}
#[test]
fn serializes_swap_v2_payload_json() {
let events = crate::kb_decode_raydium_clmm_instruction(
sample_swap_v2_accounts_json(),
r#""ASCsAbe1UnDnCsnGLPALJUXSS5JREycfhGyTzKh7xRWNyRHCqBuzR23S""#,
);
assert_eq!(events.len(), 1);
let payload_option = events[0].to_payload_json();
let payload = match payload_option {
Some(payload) => payload,
None => panic!("payload json must be available"),
};
assert!(payload.contains("GUrRxvnWVQSnbcz1eP9D5BqXwPZtRhmrqVfm5wY9meWR"));
assert!(payload.contains("input_vault"));
assert!(payload.contains("output_vault"));
assert!(payload.contains("tradeSide"));
}
#[test]
fn ignores_invalid_data() {
let events = crate::kb_decode_raydium_clmm_instruction(
sample_swap_v2_accounts_json(),
r#""not-base58-data-0""#,
);
assert_eq!(events.len(), 0);
}
#[test]
fn ignores_incomplete_accounts() {
let accounts_json = r#"[
"8NQ32SyFKD1d5kenq4oM8Da6C6J9TQSMW1uAgFRveEQr",
"A1BBtTYJd4i3xU8D6Tc2FzU6ZN4oXZWXKZnCxwbHXr8x",
"GUrRxvnWVQSnbcz1eP9D5BqXwPZtRhmrqVfm5wY9meWR"
]"#;
let events = crate::kb_decode_raydium_clmm_instruction(
accounts_json,
r#""ASCsAbe1UnDnCsnGLPALJUXSS5JREycfhGyTzKh7xRWNyRHCqBuzR23S""#,
);
assert_eq!(events.len(), 0);
}
#[test]
fn ignores_legacy_swap_for_now() {
let mut data = std::vec::Vec::<u8>::new();
data.push(248);
data.push(198);
data.push(158);
data.push(145);
data.push(225);
data.push(117);
data.push(135);
data.push(200);
while data.len() < 41 {
data.push(0);
}
data[40] = 1;
let encoded = bs58::encode(data).into_string();
let data_json = format!("\"{}\"", encoded);
let events = crate::kb_decode_raydium_clmm_instruction(
sample_swap_v2_accounts_json(),
data_json.as_str(),
);
assert_eq!(events.len(), 0);
}
}

View File

@@ -37,6 +37,42 @@ impl KbDexDecodeService {
}
}
async fn decode_and_persist_raydium_clmm_events(
&self,
transaction: &crate::KbChainTransactionDto,
instructions: &[crate::KbChainInstructionDto],
) -> Result<std::vec::Vec<crate::KbDexDecodedEventDto>, crate::KbError> {
let mut persisted = std::vec::Vec::new();
for instruction in instructions {
let program_id = match instruction.program_id.as_ref() {
Some(program_id) => program_id,
None => continue,
};
if program_id.as_str() != crate::KB_RAYDIUM_CLMM_PROGRAM_ID {
continue;
}
let data_json = match instruction.data_json.as_ref() {
Some(data_json) => data_json,
None => continue,
};
let decoded_events = crate::kb_decode_raydium_clmm_instruction(
instruction.accounts_json.as_str(),
data_json.as_str(),
);
for decoded_event in &decoded_events {
let persist_result = self
.persist_raydium_clmm_event(transaction, instruction, decoded_event)
.await;
let persisted_event = match persist_result {
Ok(persisted_event) => persisted_event,
Err(error) => return Err(error),
};
persisted.push(persisted_event);
}
}
Ok(persisted)
}
/// Decodes one projected transaction and persists the decoded events.
pub async fn decode_transaction_by_signature(
&self,
@@ -104,6 +140,16 @@ impl KbDexDecodeService {
for persisted_event in raydium_cpmm_persisted {
persisted.push(persisted_event);
}
let raydium_clmm_persisted_result = self
.decode_and_persist_raydium_clmm_events(&transaction, &instructions)
.await;
let raydium_clmm_persisted = match raydium_clmm_persisted_result {
Ok(raydium_clmm_persisted) => raydium_clmm_persisted,
Err(error) => return Err(error),
};
for persisted_event in raydium_clmm_persisted {
persisted.push(persisted_event);
}
let pump_fun_decoded_result = self
.pump_fun_decoder
.decode_transaction(&transaction, &instructions);
@@ -248,15 +294,14 @@ impl KbDexDecodeService {
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
match decoded_event {
crate::KbDexlabDecodedEvent::CreatePool(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"dexlab",
"dexlab.create_pool",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded dexlab payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -342,15 +387,14 @@ impl KbDexDecodeService {
Ok(fetched)
}
crate::KbDexlabDecodedEvent::Swap(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"dexlab",
"dexlab.swap",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded dexlab payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -445,15 +489,14 @@ impl KbDexDecodeService {
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
match decoded_event {
crate::KbFluxbeamDecodedEvent::CreatePool(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"fluxbeam",
"fluxbeam.create_pool",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded fluxbeam payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -539,15 +582,14 @@ impl KbDexDecodeService {
Ok(fetched)
}
crate::KbFluxbeamDecodedEvent::Swap(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"fluxbeam",
"fluxbeam.swap",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded fluxbeam payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -642,15 +684,14 @@ impl KbDexDecodeService {
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
match decoded_event {
crate::KbOrcaWhirlpoolsDecodedEvent::CreatePool(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"orca_whirlpools",
"orca_whirlpools.create_pool",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded orca whirlpools payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -736,15 +777,14 @@ impl KbDexDecodeService {
Ok(fetched)
}
crate::KbOrcaWhirlpoolsDecodedEvent::Swap(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"orca_whirlpools",
"orca_whirlpools.swap",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded orca whirlpools payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -840,15 +880,14 @@ impl KbDexDecodeService {
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
match decoded_event {
crate::KbMeteoraDammV1DecodedEvent::CreatePool(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"meteora_damm_v1",
"meteora_damm_v1.create_pool",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded meteora damm v1 payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -934,15 +973,14 @@ impl KbDexDecodeService {
Ok(fetched)
}
crate::KbMeteoraDammV1DecodedEvent::Swap(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"meteora_damm_v1",
"meteora_damm_v1.swap",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded meteora damm v1 payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -1037,15 +1075,14 @@ impl KbDexDecodeService {
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
match decoded_event {
crate::KbMeteoraDammV2DecodedEvent::CreatePool(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"meteora_damm_v2",
"meteora_damm_v2.create_pool",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded meteora damm v2 payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -1131,15 +1168,14 @@ impl KbDexDecodeService {
Ok(fetched)
}
crate::KbMeteoraDammV2DecodedEvent::Swap(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"meteora_damm_v2",
"meteora_damm_v2.swap",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded meteora damm v2 payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -1234,15 +1270,14 @@ impl KbDexDecodeService {
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
match decoded_event {
crate::KbMeteoraDbcDecodedEvent::CreatePool(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"meteora_dbc",
"meteora_dbc.create_pool",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded meteora dbc payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -1328,15 +1363,14 @@ impl KbDexDecodeService {
Ok(fetched)
}
crate::KbMeteoraDbcDecodedEvent::Swap(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"meteora_dbc",
"meteora_dbc.swap",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded meteora dbc payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -1431,15 +1465,14 @@ impl KbDexDecodeService {
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
match decoded_event {
crate::KbRaydiumAmmV4DecodedEvent::Initialize2Pool(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"raydium_amm_v4",
"raydium_amm_v4.initialize2_pool",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded raydium payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -1563,6 +1596,141 @@ impl KbDexDecodeService {
Ok(persisted)
}
async fn persist_raydium_clmm_event(
&self,
transaction: &crate::KbChainTransactionDto,
instruction: &crate::KbChainInstructionDto,
decoded_event: &crate::KbRaydiumClmmDecodedEvent,
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
let transaction_id = match transaction.id {
Some(transaction_id) => transaction_id,
None => {
return Err(crate::KbError::InvalidState(format!(
"transaction '{}' has no internal id",
transaction.signature
)));
}
};
let instruction_id = match instruction.id {
Some(instruction_id) => instruction_id,
None => {
return Err(crate::KbError::InvalidState(format!(
"raydium clmm instruction for transaction '{}' has no internal id",
transaction.signature
)));
}
};
let event_kind = decoded_event.event_kind().to_string();
let raw_payload_json = match decoded_event.to_payload_json() {
Some(payload_json) => payload_json,
None => {
return Err(crate::KbError::Json(
"cannot serialize decoded raydium clmm payload".to_string(),
));
}
};
let payload_json_result = kb_enrich_serialized_dex_decoded_payload(
"raydium_clmm",
event_kind.as_str(),
raw_payload_json.as_str(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
transaction_id,
Some(instruction_id),
event_kind.as_str(),
)
.await;
let existing_option = match existing_result {
Ok(existing_option) => existing_option,
Err(error) => return Err(error),
};
let already_present = existing_option.is_some();
let dto = crate::KbDexDecodedEventDto::new(
transaction_id,
Some(instruction_id),
"raydium_clmm".to_string(),
crate::KB_RAYDIUM_CLMM_PROGRAM_ID.to_string(),
event_kind.clone(),
Some(decoded_event.pool_account().to_string()),
None,
Some(decoded_event.base_mint().to_string()),
Some(decoded_event.quote_mint().to_string()),
None,
payload_json.clone(),
);
let upsert_result = crate::upsert_dex_decoded_event(self.database.as_ref(), &dto).await;
if let Err(error) = upsert_result {
return Err(error);
}
let fetched_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
transaction_id,
Some(instruction_id),
event_kind.as_str(),
)
.await;
let fetched_option = match fetched_result {
Ok(fetched_option) => fetched_option,
Err(error) => return Err(error),
};
let fetched = match fetched_option {
Some(fetched) => fetched,
None => {
return Err(crate::KbError::InvalidState(
"decoded raydium clmm event disappeared after upsert".to_string(),
));
}
};
if !already_present {
let payload_value_result =
serde_json::from_str::<serde_json::Value>(payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot parse raydium clmm payload after serialization: {}",
error
)));
}
};
let observation_result = self
.persistence
.record_observation(&crate::KbDetectionObservationInput::new(
format!("dex.{}", event_kind),
crate::KbObservationSourceKind::HttpRpc,
transaction.source_endpoint_name.clone(),
transaction.signature.clone(),
transaction.slot,
payload_value.clone(),
))
.await;
let observation_id = match observation_result {
Ok(observation_id) => observation_id,
Err(error) => return Err(error),
};
let signal_result = self
.persistence
.record_signal(&crate::KbDetectionSignalInput::new(
format!("signal.dex.{}", event_kind),
crate::KbAnalysisSignalSeverity::Low,
transaction.signature.clone(),
Some(observation_id),
None,
payload_value,
))
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
Ok(fetched)
}
async fn persist_raydium_cpmm_event(
&self,
transaction: &crate::KbChainTransactionDto,
@@ -1587,7 +1755,8 @@ impl KbDexDecodeService {
)));
}
};
let payload_json = match decoded_event.to_payload_json() {
let event_kind = decoded_event.event_kind().to_string();
let raw_payload_json = match decoded_event.to_payload_json() {
Some(payload_json) => payload_json,
None => {
return Err(crate::KbError::Json(
@@ -1595,7 +1764,15 @@ impl KbDexDecodeService {
));
}
};
let event_kind = decoded_event.event_kind().to_string();
let payload_json_result = kb_enrich_serialized_dex_decoded_payload(
"raydium_cpmm",
event_kind.as_str(),
raw_payload_json.as_str(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
transaction_id,
@@ -1696,15 +1873,14 @@ impl KbDexDecodeService {
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
match decoded_event {
crate::KbPumpFunDecodedEvent::CreateV2Token(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"pump_fun",
"pump_fun.create_v2_token",
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded pump.fun payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -1820,15 +1996,14 @@ impl KbDexDecodeService {
signal_kind: &str,
observation_kind: &str,
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"pump_fun",
event_kind,
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded pump.fun trade payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -1950,15 +2125,14 @@ impl KbDexDecodeService {
signal_kind: &str,
observation_kind: &str,
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json_result = kb_enrich_and_serialize_dex_decoded_payload(
"pump_swap",
event_kind,
event.payload_json.clone(),
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded pump swap payload: {}",
error
)));
}
Err(error) => return Err(error),
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
@@ -2044,8 +2218,258 @@ impl KbDexDecodeService {
}
}
// Classifies a DEX event kind into a stable business category.
fn kb_classify_dex_event_category(event_kind: &str) -> &'static str {
if kb_is_dex_reward_event_kind(event_kind) {
return "reward";
}
if kb_is_dex_fee_event_kind(event_kind) {
return "fee";
}
if kb_is_dex_liquidity_event_kind(event_kind) {
return "liquidity";
}
if kb_is_dex_pool_lifecycle_event_kind(event_kind) {
return "pool_lifecycle";
}
if kb_is_dex_admin_event_kind(event_kind) {
return "admin";
}
if kb_is_dex_trade_event_kind(event_kind) {
return "trade";
}
"unknown"
}
// Returns true when the event kind represents a swap-like event.
fn kb_is_dex_trade_event_kind(event_kind: &str) -> bool {
if event_kind.ends_with(".buy") {
return true;
}
if event_kind.ends_with(".sell") {
return true;
}
if event_kind.ends_with(".swap") {
return true;
}
if event_kind.contains(".swap_") {
return true;
}
false
}
// Returns true when the event kind can directly produce a candle candidate.
fn kb_is_dex_candle_candidate_event_kind(event_kind: &str) -> bool {
if event_kind.contains("router") {
return false;
}
if event_kind.contains("route") {
return false;
}
kb_is_dex_trade_event_kind(event_kind)
}
// Returns true for liquidity lifecycle changes that must not become candles.
fn kb_is_dex_liquidity_event_kind(event_kind: &str) -> bool {
if event_kind.contains(".deposit") {
return true;
}
if event_kind.contains(".withdraw") {
return true;
}
if event_kind.contains(".increase_liquidity") {
return true;
}
if event_kind.contains(".decrease_liquidity") {
return true;
}
if event_kind.contains(".open_position") {
return true;
}
if event_kind.contains(".close_position") {
return true;
}
false
}
// Returns true for fee collection events.
fn kb_is_dex_fee_event_kind(event_kind: &str) -> bool {
if event_kind.contains("collect_creator_fee") {
return true;
}
if event_kind.contains("collect_protocol_fee") {
return true;
}
if event_kind.contains("collect_fund_fee") {
return true;
}
if event_kind.contains("collect_fee") {
return true;
}
false
}
// Returns true for reward/incentive events.
fn kb_is_dex_reward_event_kind(event_kind: &str) -> bool {
if event_kind.contains("reward") {
return true;
}
if event_kind.contains("emission") {
return true;
}
false
}
// Returns true for pool creation / initialization / migration events.
fn kb_is_dex_pool_lifecycle_event_kind(event_kind: &str) -> bool {
if event_kind.contains(".initialize") {
return true;
}
if event_kind.contains(".initialize_with_permission") {
return true;
}
if event_kind.contains(".create_pool") {
return true;
}
if event_kind.contains(".create_v2_token") {
return true;
}
if event_kind.contains(".migrate") {
return true;
}
false
}
// Returns true for admin/config/permission changes.
fn kb_is_dex_admin_event_kind(event_kind: &str) -> bool {
if event_kind.contains("admin") {
return true;
}
if event_kind.contains("config") {
return true;
}
if event_kind.contains("permission") {
return true;
}
if event_kind.contains("set_") {
return true;
}
if event_kind.contains("update_") {
return true;
}
false
}
// Enriches a decoded payload with non-destructive classification metadata.
fn kb_enrich_dex_decoded_payload(
protocol_name: &str,
event_kind: &str,
payload_json: serde_json::Value,
) -> serde_json::Value {
let event_category = kb_classify_dex_event_category(event_kind);
let trade_candidate = kb_is_dex_trade_event_kind(event_kind);
let candle_candidate = kb_is_dex_candle_candidate_event_kind(event_kind);
let mut object = match payload_json {
serde_json::Value::Object(object) => object,
other => {
let mut object = serde_json::Map::new();
object.insert("rawPayload".to_owned(), other);
object
}
};
kb_json_insert_string_if_missing(&mut object, "protocolName", protocol_name);
kb_json_insert_string_if_missing(&mut object, "eventKind", event_kind);
kb_json_insert_string_if_missing(&mut object, "eventCategory", event_category);
kb_json_insert_bool_if_missing(&mut object, "tradeCandidate", trade_candidate);
kb_json_insert_bool_if_missing(&mut object, "candleCandidate", candle_candidate);
kb_json_insert_i64_if_missing(&mut object, "eventClassificationVersion", 1);
if !trade_candidate {
kb_json_insert_string_if_missing(&mut object, "skipTradeReason", "non_trade_event");
} else if !candle_candidate {
kb_json_insert_string_if_missing(
&mut object,
"skipCandleReason",
"route_or_multihop_event_requires_leg_resolution",
);
}
serde_json::Value::Object(object)
}
// Inserts a string JSON property without overriding existing decoded data.
fn kb_json_insert_string_if_missing(
object: &mut serde_json::Map<String, serde_json::Value>,
key: &str,
value: &str,
) {
if object.contains_key(key) {
return;
}
object.insert(key.to_owned(), serde_json::Value::String(value.to_owned()));
}
// Inserts a bool JSON property without overriding existing decoded data.
fn kb_json_insert_bool_if_missing(
object: &mut serde_json::Map<String, serde_json::Value>,
key: &str,
value: bool,
) {
if object.contains_key(key) {
return;
}
object.insert(key.to_owned(), serde_json::Value::Bool(value));
}
// Inserts an i64 JSON property without overriding existing decoded data.
fn kb_json_insert_i64_if_missing(
object: &mut serde_json::Map<String, serde_json::Value>,
key: &str,
value: i64,
) {
if object.contains_key(key) {
return;
}
object.insert(
key.to_owned(),
serde_json::Value::Number(serde_json::Number::from(value)),
);
}
fn kb_enrich_and_serialize_dex_decoded_payload(
protocol_name: &str,
event_kind: &str,
payload_json: serde_json::Value,
) -> Result<String, crate::KbError> {
let enriched_payload = kb_enrich_dex_decoded_payload(protocol_name, event_kind, payload_json);
let payload_json_result = serde_json::to_string(&enriched_payload);
match payload_json_result {
Ok(payload_json) => Ok(payload_json),
Err(error) => Err(crate::KbError::Json(format!(
"cannot serialize enriched decoded payload for '{}': {}",
event_kind, error
))),
}
}
fn kb_enrich_serialized_dex_decoded_payload(
protocol_name: &str,
event_kind: &str,
payload_json: &str,
) -> Result<String, crate::KbError> {
let payload_value_result = serde_json::from_str::<serde_json::Value>(payload_json);
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot parse decoded payload for '{}': {}",
event_kind, error
)));
}
};
kb_enrich_and_serialize_dex_decoded_payload(protocol_name, event_kind, payload_value)
}
#[cfg(test)]
mod tests {
async fn make_database() -> std::sync::Arc<crate::KbDatabase> {
let tempdir_result = tempfile::tempdir();
let tempdir = match tempdir_result {
@@ -2836,4 +3260,151 @@ mod tests {
Some("So11111111111111111111111111111111111111112".to_string())
);
}
#[test]
fn classifies_swap_events_as_trade_candidates() {
assert_eq!(
super::kb_classify_dex_event_category("raydium_cpmm.swap_base_input"),
"trade"
);
assert_eq!(
super::kb_classify_dex_event_category("raydium_cpmm.swap_base_output"),
"trade"
);
assert_eq!(
super::kb_classify_dex_event_category("raydium_clmm.swap"),
"trade"
);
assert_eq!(
super::kb_classify_dex_event_category("raydium_clmm.swap_v2"),
"trade"
);
assert_eq!(
super::kb_classify_dex_event_category("pump_fun.buy"),
"trade"
);
assert!(super::kb_is_dex_trade_event_kind(
"raydium_cpmm.swap_base_input"
));
assert!(super::kb_is_dex_candle_candidate_event_kind(
"raydium_cpmm.swap_base_input"
));
}
#[test]
fn classifies_router_swap_as_trade_but_not_direct_candle_candidate() {
assert_eq!(
super::kb_classify_dex_event_category("raydium_clmm.swap_router_base_in"),
"trade"
);
assert!(super::kb_is_dex_trade_event_kind(
"raydium_clmm.swap_router_base_in"
));
assert!(!super::kb_is_dex_candle_candidate_event_kind(
"raydium_clmm.swap_router_base_in"
));
}
#[test]
fn classifies_fee_reward_liquidity_and_lifecycle_events() {
assert_eq!(
super::kb_classify_dex_event_category("raydium_cpmm.collect_creator_fee"),
"fee"
);
assert_eq!(
super::kb_classify_dex_event_category("raydium_clmm.collect_protocol_fee"),
"fee"
);
assert_eq!(
super::kb_classify_dex_event_category("raydium_clmm.set_reward_params"),
"reward"
);
assert_eq!(
super::kb_classify_dex_event_category("raydium_clmm.increase_liquidity_v2"),
"liquidity"
);
assert_eq!(
super::kb_classify_dex_event_category("raydium_cpmm.initialize"),
"pool_lifecycle"
);
}
#[test]
fn enriches_payload_without_overriding_existing_fields() {
let payload_json = serde_json::json!({
"eventCategory": "custom",
"amountIn": "10"
});
let enriched_payload = super::kb_enrich_dex_decoded_payload(
"raydium_cpmm",
"raydium_cpmm.swap_base_input",
payload_json,
);
let object_option = enriched_payload.as_object();
let object = match object_option {
Some(object) => object,
None => {
panic!("expected enriched payload object");
}
};
assert_eq!(
object.get("eventCategory"),
Some(&serde_json::Value::String("custom".to_owned()))
);
assert_eq!(
object.get("protocolName"),
Some(&serde_json::Value::String("raydium_cpmm".to_owned()))
);
assert_eq!(
object.get("eventKind"),
Some(&serde_json::Value::String(
"raydium_cpmm.swap_base_input".to_owned()
))
);
assert_eq!(
object.get("tradeCandidate"),
Some(&serde_json::Value::Bool(true))
);
assert_eq!(
object.get("candleCandidate"),
Some(&serde_json::Value::Bool(true))
);
}
#[test]
fn enriches_non_object_payload_as_raw_payload() {
let payload_json = serde_json::Value::String("raw".to_owned());
let enriched_payload = super::kb_enrich_dex_decoded_payload(
"raydium_clmm",
"raydium_clmm.collect_protocol_fee",
payload_json,
);
let object_option = enriched_payload.as_object();
let object = match object_option {
Some(object) => object,
None => {
panic!("expected enriched payload object");
}
};
assert_eq!(
object.get("rawPayload"),
Some(&serde_json::Value::String("raw".to_owned()))
);
assert_eq!(
object.get("eventCategory"),
Some(&serde_json::Value::String("fee".to_owned()))
);
assert_eq!(
object.get("tradeCandidate"),
Some(&serde_json::Value::Bool(false))
);
assert_eq!(
object.get("candleCandidate"),
Some(&serde_json::Value::Bool(false))
);
assert_eq!(
object.get("skipTradeReason"),
Some(&serde_json::Value::String("non_trade_event".to_owned()))
);
}
}

View File

@@ -106,6 +106,18 @@ impl KbDexDetectService {
};
detection_results.push(detect_result);
}
if decoded_event.protocol_name == "raydium_clmm"
&& decoded_event.event_kind == "raydium_clmm.swap_v2"
{
let detect_result = self
.detect_raydium_clmm_trade(&transaction, decoded_event)
.await;
let detect_result = match detect_result {
Ok(detect_result) => detect_result,
Err(error) => return Err(error),
};
detection_results.push(detect_result);
}
if decoded_event.protocol_name == "pump_fun"
&& decoded_event.event_kind == "pump_fun.create_v2_token"
{
@@ -2747,6 +2759,214 @@ impl KbDexDetectService {
created_listing,
})
}
async fn detect_raydium_clmm_trade(
&self,
transaction: &crate::KbChainTransactionDto,
decoded_event: &crate::KbDexDecodedEventDto,
) -> Result<crate::KbDexPoolDetectionResult, crate::KbError> {
let decoded_event_id = match decoded_event.id {
Some(decoded_event_id) => decoded_event_id,
None => {
return Err(crate::KbError::InvalidState(
"decoded dex event has no internal id".to_string(),
));
}
};
let dex_id_result = self.ensure_raydium_clmm_dex().await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let pool_address = match decoded_event.pool_account.clone() {
Some(pool_address) => pool_address,
None => {
return Err(crate::KbError::InvalidState(format!(
"decoded event '{}' has no pool_account",
decoded_event_id
)));
}
};
let base_mint = match decoded_event.token_a_mint.clone() {
Some(base_mint) => base_mint,
None => {
return Err(crate::KbError::InvalidState(format!(
"decoded event '{}' has no token_a_mint",
decoded_event_id
)));
}
};
let quote_mint = match decoded_event.token_b_mint.clone() {
Some(quote_mint) => quote_mint,
None => {
return Err(crate::KbError::InvalidState(format!(
"decoded event '{}' has no token_b_mint",
decoded_event_id
)));
}
};
let payload_value_result = kb_parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
let base_vault_address = kb_extract_payload_string_field(&payload_value, "base_vault");
let quote_vault_address = kb_extract_payload_string_field(&payload_value, "quote_vault");
let base_token_id_result = self.ensure_token(base_mint.as_str()).await;
let base_token_id = match base_token_id_result {
Ok(base_token_id) => base_token_id,
Err(error) => return Err(error),
};
let quote_token_id_result = self.ensure_token(quote_mint.as_str()).await;
let quote_token_id = match quote_token_id_result {
Ok(quote_token_id) => quote_token_id,
Err(error) => return Err(error),
};
let existing_pool_result =
crate::get_pool_by_address(self.database.as_ref(), pool_address.as_str()).await;
let existing_pool_option = match existing_pool_result {
Ok(existing_pool_option) => existing_pool_option,
Err(error) => return Err(error),
};
let created_pool = existing_pool_option.is_none();
let pool_id = match existing_pool_option {
Some(pool) => match pool.id {
Some(pool_id) => pool_id,
None => {
return Err(crate::KbError::InvalidState(format!(
"pool '{}' has no internal id",
pool.address
)));
}
},
None => {
let pool_dto = crate::KbPoolDto::new(
dex_id,
pool_address.clone(),
crate::KbPoolKind::Clmm,
crate::KbPoolStatus::Active,
);
let upsert_result = crate::upsert_pool(self.database.as_ref(), &pool_dto).await;
match upsert_result {
Ok(pool_id) => pool_id,
Err(error) => return Err(error),
}
}
};
let existing_pair_result =
crate::get_pair_by_pool_id(self.database.as_ref(), pool_id).await;
let existing_pair_option = match existing_pair_result {
Ok(existing_pair_option) => existing_pair_option,
Err(error) => return Err(error),
};
let created_pair = existing_pair_option.is_none();
let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str());
let pair_dto =
crate::KbPairDto::new(dex_id, pool_id, base_token_id, quote_token_id, pair_symbol);
let pair_id_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await;
let pair_id = match pair_id_result {
Ok(pair_id) => pair_id,
Err(error) => return Err(error),
};
let upsert_base_pool_token_result = crate::upsert_pool_token(
self.database.as_ref(),
&crate::KbPoolTokenDto::new(
pool_id,
base_token_id,
crate::KbPoolTokenRole::Base,
base_vault_address,
Some(0),
),
)
.await;
if let Err(error) = upsert_base_pool_token_result {
return Err(error);
}
let upsert_quote_pool_token_result = crate::upsert_pool_token(
self.database.as_ref(),
&crate::KbPoolTokenDto::new(
pool_id,
quote_token_id,
crate::KbPoolTokenRole::Quote,
quote_vault_address,
Some(1),
),
)
.await;
if let Err(error) = upsert_quote_pool_token_result {
return Err(error);
}
let existing_listing_result =
crate::get_pool_listing_by_pool_id(self.database.as_ref(), pool_id).await;
let existing_listing_option = match existing_listing_result {
Ok(existing_listing_option) => existing_listing_option,
Err(error) => return Err(error),
};
let created_listing = existing_listing_option.is_none();
let pool_listing_id = match existing_listing_option {
Some(pool_listing) => pool_listing.id,
None => {
let listing_id_result = self
.upsert_pool_listing_from_decoded_event(dex_id, pool_id, pair_id, transaction)
.await;
match listing_id_result {
Ok(listing_id) => Some(listing_id),
Err(error) => return Err(error),
}
}
};
if created_pool {
let signal_result = self
.record_detection_signal(
transaction,
"signal.dex.raydium_clmm.new_pool",
crate::KbAnalysisSignalSeverity::Low,
payload_value.clone(),
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
if created_pair {
let signal_result = self
.record_detection_signal(
transaction,
"signal.dex.raydium_clmm.new_pair",
crate::KbAnalysisSignalSeverity::Low,
payload_value.clone(),
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
if created_listing {
let signal_result = self
.record_detection_signal(
transaction,
"signal.dex.raydium_clmm.first_listing_seen",
crate::KbAnalysisSignalSeverity::Low,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
Ok(crate::KbDexPoolDetectionResult {
decoded_event_id,
dex_id,
pool_id,
pair_id,
pool_listing_id,
created_pool,
created_pair,
created_listing,
})
}
async fn detect_raydium_cpmm_trade(
&self,
@@ -2980,6 +3200,32 @@ impl KbDexDetectService {
}
}
async fn ensure_raydium_clmm_dex(&self) -> Result<i64, crate::KbError> {
let dex_result = crate::get_dex_by_code(self.database.as_ref(), "raydium_clmm").await;
let dex_option = match dex_result {
Ok(dex_option) => dex_option,
Err(error) => return Err(error),
};
match dex_option {
Some(dex) => match dex.id {
Some(dex_id) => Ok(dex_id),
None => Err(crate::KbError::InvalidState(
"raydium_clmm dex has no internal id".to_string(),
)),
},
None => {
let dex_dto = crate::KbDexDto::new(
"raydium_clmm".to_string(),
"Raydium CLMM".to_string(),
Some(crate::KB_RAYDIUM_CLMM_PROGRAM_ID.to_string()),
None,
true,
);
crate::upsert_dex(self.database.as_ref(), &dex_dto).await
}
}
}
async fn ensure_dexlab_dex(&self) -> Result<i64, crate::KbError> {
let dex_result = crate::get_dex_by_code(self.database.as_ref(), "dexlab").await;
let dex_option = match dex_result {

View File

@@ -236,6 +236,7 @@ pub use dex::KB_ORCA_WHIRLPOOLS_PROGRAM_ID;
pub use dex::KB_PUMP_FUN_PROGRAM_ID;
pub use dex::KB_PUMP_SWAP_PROGRAM_ID;
pub use dex::KB_RAYDIUM_AMM_V4_PROGRAM_ID;
pub use dex::KB_RAYDIUM_CLMM_PROGRAM_ID;
pub use dex::KB_RAYDIUM_CPMM_PROGRAM_ID;
pub use dex::KbDexlabCreatePoolDecoded;
pub use dex::KbDexlabDecodedEvent;
@@ -271,9 +272,12 @@ pub use dex::KbPumpSwapTradeDecoded;
pub use dex::KbRaydiumAmmV4DecodedEvent;
pub use dex::KbRaydiumAmmV4Decoder;
pub use dex::KbRaydiumAmmV4Initialize2PoolDecoded;
pub use dex::KbRaydiumClmmDecodedEvent;
pub use dex::KbRaydiumClmmSwapV2Decoded;
pub use dex::KbRaydiumCpmmDecodedEvent;
pub use dex::KbRaydiumCpmmSwapDecoded;
pub use dex::KbRaydiumCpmmSwapMode;
pub use dex::kb_decode_raydium_clmm_instruction;
pub use dex::kb_decode_raydium_cpmm_instruction;
pub use dex_decode::KbDexDecodeService;
pub use dex_detect::KbDexDetectService;

View File

@@ -80,14 +80,23 @@ impl KbPairCandleAggregationService {
let mut seen = std::collections::HashSet::<(i64, i64, i64)>::new();
let mut results = std::vec::Vec::new();
for trade_event in &trade_events {
let event_time_option =
kb_extract_trade_event_unix_time(self.database.as_ref(), trade_event).await?;
let event_time_option_result =
kb_extract_trade_event_unix_time(self.database.as_ref(), trade_event).await;
let event_time_option = match event_time_option_result {
Ok(event_time_option) => event_time_option,
Err(error) => return Err(error),
};
let event_time_unix = match event_time_option {
Some(event_time_unix) => event_time_unix,
None => continue,
};
for timeframe_seconds in &materialized_timeframes {
let bucket_start_unix = kb_bucket_start_unix(event_time_unix, *timeframe_seconds)?;
let bucket_start_unix_result =
kb_bucket_start_unix(event_time_unix, *timeframe_seconds);
let bucket_start_unix = match bucket_start_unix_result {
Ok(bucket_start_unix) => bucket_start_unix,
Err(error) => return Err(error),
};
let dedupe_key = (trade_event.pair_id, *timeframe_seconds, bucket_start_unix);
if seen.contains(&dedupe_key) {
continue;
@@ -199,7 +208,12 @@ pub(crate) async fn kb_build_candle_from_trade_events(
if trade_event.pair_id != pair_id {
continue;
}
let event_time_option = kb_extract_trade_event_unix_time(database, trade_event).await?;
let event_time_option_result =
kb_extract_trade_event_unix_time(database, trade_event).await;
let event_time_option = match event_time_option_result {
Ok(event_time_option) => event_time_option,
Err(error) => return Err(error),
};
let event_time_unix = match event_time_option {
Some(event_time_unix) => event_time_unix,
None => continue,

View File

@@ -150,12 +150,34 @@ impl KbTradeAggregationService {
let payload = match payload_result {
Ok(payload) => payload,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot parse decoded_event payload_json '{}': {}",
decoded_event.payload_json, error
)));
tracing::warn!(
event_kind = %decoded_event.event_kind,
pool_account = ?decoded_event.pool_account,
decoded_event_id = ?decoded_event.id,
error = %error,
"skipping decoded event with invalid payload_json"
);
continue;
}
};
if !kb_is_decoded_event_trade_candidate(decoded_event.event_kind.as_str(), &payload) {
tracing::debug!(
event_kind = %decoded_event.event_kind,
pool_account = ?decoded_event.pool_account,
decoded_event_id = ?decoded_event.id,
"skipping non-trade decoded event"
);
continue;
}
if !kb_is_decoded_event_candle_candidate(decoded_event.event_kind.as_str(), &payload) {
tracing::debug!(
event_kind = %decoded_event.event_kind,
pool_account = ?decoded_event.pool_account,
decoded_event_id = ?decoded_event.id,
"skipping non-candle decoded trade candidate"
);
continue;
}
let trade_side = kb_extract_trade_side(decoded_event.event_kind.as_str(), &payload);
let mut base_amount_raw = kb_extract_amount_string(
&payload,
@@ -253,6 +275,31 @@ impl KbTradeAggregationService {
price_quote_per_base = inferred.2;
}
}
if decoded_event.event_kind.starts_with("raydium_clmm.")
&& (base_amount_raw.is_none()
|| quote_amount_raw.is_none()
|| price_quote_per_base.is_none())
{
let inferred_result = kb_extract_trade_amounts_from_vault_balance_deltas(
transaction.transaction_json.as_str(),
transaction.meta_json.as_deref(),
base_vault_address.as_deref(),
quote_vault_address.as_deref(),
);
let inferred = match inferred_result {
Ok(inferred) => inferred,
Err(error) => return Err(error),
};
if base_amount_raw.is_none() {
base_amount_raw = inferred.0;
}
if quote_amount_raw.is_none() {
quote_amount_raw = inferred.1;
}
if price_quote_per_base.is_none() {
price_quote_per_base = inferred.2;
}
}
if price_quote_per_base.is_none() {
price_quote_per_base = kb_compute_price_quote_per_base_with_decimals(
transaction.meta_json.as_deref(),
@@ -267,6 +314,23 @@ impl KbTradeAggregationService {
quote_amount_raw.as_deref(),
);
}
if !kb_is_priced_trade_event(
base_amount_raw.as_deref(),
quote_amount_raw.as_deref(),
price_quote_per_base,
) {
tracing::debug!(
event_kind = %decoded_event.event_kind,
pool_account = ?decoded_event.pool_account,
decoded_event_id = ?decoded_event.id,
transaction_signature = %transaction.signature,
base_amount_raw = ?base_amount_raw,
quote_amount_raw = ?quote_amount_raw,
price_quote_per_base = ?price_quote_per_base,
"skipping unpriced trade aggregation candidate"
);
continue;
}
let slot_i64 = kb_convert_slot_to_i64(transaction.slot);
let created_trade_event = existing_trade_option.is_none();
let trade_event_dto = crate::KbTradeEventDto::new(
@@ -404,6 +468,123 @@ impl KbTradeAggregationService {
}
}
fn kb_is_decoded_event_trade_candidate(event_kind: &str, payload: &serde_json::Value) -> bool {
let trade_candidate_option = kb_extract_top_level_bool_by_candidate_keys(
payload,
&["tradeCandidate", "trade_candidate"],
);
if let Some(trade_candidate) = trade_candidate_option {
return trade_candidate;
}
let event_category_option =
kb_extract_string_by_candidate_keys(payload, &["eventCategory", "event_category"]);
if let Some(event_category) = event_category_option {
return event_category.as_str() == "trade";
}
kb_is_trade_event_kind(event_kind)
}
fn kb_is_decoded_event_candle_candidate(event_kind: &str, payload: &serde_json::Value) -> bool {
let candle_candidate_option = kb_extract_top_level_bool_by_candidate_keys(
payload,
&["candleCandidate", "candle_candidate"],
);
if let Some(candle_candidate) = candle_candidate_option {
return candle_candidate;
}
if !kb_is_decoded_event_trade_candidate(event_kind, payload) {
return false;
}
kb_is_trade_event_kind(event_kind)
}
fn kb_extract_top_level_bool_by_candidate_keys(
payload: &serde_json::Value,
candidate_keys: &[&str],
) -> std::option::Option<bool> {
let object = match payload.as_object() {
Some(object) => object,
None => return None,
};
for candidate_key in candidate_keys {
let value_option = object.get(*candidate_key);
let value = match value_option {
Some(value) => value,
None => continue,
};
if let Some(value_bool) = value.as_bool() {
return Some(value_bool);
}
if let Some(value_i64) = value.as_i64() {
return Some(value_i64 != 0);
}
if let Some(value_u64) = value.as_u64() {
return Some(value_u64 != 0);
}
if let Some(value_text) = value.as_str() {
let normalized = value_text.trim().to_ascii_lowercase();
if normalized.as_str() == "true" {
return Some(true);
}
if normalized.as_str() == "false" {
return Some(false);
}
if normalized.as_str() == "1" {
return Some(true);
}
if normalized.as_str() == "0" {
return Some(false);
}
}
}
None
}
fn kb_is_priced_trade_event(
base_amount_raw: std::option::Option<&str>,
quote_amount_raw: std::option::Option<&str>,
price_quote_per_base: std::option::Option<f64>,
) -> bool {
let base_amount_raw = match base_amount_raw {
Some(base_amount_raw) => base_amount_raw.trim(),
None => return false,
};
if base_amount_raw.is_empty() {
return false;
}
let base_amount_result = base_amount_raw.parse::<i128>();
let base_amount = match base_amount_result {
Ok(base_amount) => base_amount,
Err(_) => return false,
};
if base_amount <= 0 {
return false;
}
let quote_amount_raw = match quote_amount_raw {
Some(quote_amount_raw) => quote_amount_raw.trim(),
None => return false,
};
if quote_amount_raw.is_empty() {
return false;
}
let quote_amount_result = quote_amount_raw.parse::<i128>();
let quote_amount = match quote_amount_result {
Ok(quote_amount) => quote_amount,
Err(_) => return false,
};
if quote_amount <= 0 {
return false;
}
let price = match price_quote_per_base {
Some(price) => price,
None => return false,
};
if !price.is_finite() {
return false;
}
price > 0.0
}
fn kb_is_trade_event_kind(event_kind: &str) -> bool {
if event_kind.ends_with(".swap") {
return true;
@@ -420,6 +601,18 @@ fn kb_is_trade_event_kind(event_kind: &str) -> bool {
if event_kind == "raydium_cpmm.swap_base_output" {
return true;
}
if event_kind == "raydium_clmm.swap_v2" {
return true;
}
if event_kind == "raydium_clmm.swap_router_base_in" {
return true;
}
if event_kind == "raydium_clmm.swap_router_base_out" {
return true;
}
if event_kind == "raydium_clmm.exact_output" {
return true;
}
false
}
@@ -1240,4 +1433,30 @@ mod tests {
};
assert_eq!(pair_metric.trade_count, 1);
}
#[test]
fn kb_is_priced_trade_event_rejects_unpriced_values() {
let result = super::kb_is_priced_trade_event(None, Some("2500"), Some(2.5));
assert!(!result);
let result = super::kb_is_priced_trade_event(Some("1000"), None, Some(2.5));
assert!(!result);
let result = super::kb_is_priced_trade_event(Some("1000"), Some("2500"), None);
assert!(!result);
let result = super::kb_is_priced_trade_event(Some("0"), Some("2500"), Some(2.5));
assert!(!result);
let result = super::kb_is_priced_trade_event(Some("1000"), Some("0"), Some(2.5));
assert!(!result);
let result = super::kb_is_priced_trade_event(Some("-1"), Some("2500"), Some(2.5));
assert!(!result);
let result = super::kb_is_priced_trade_event(Some("1000"), Some("-1"), Some(2.5));
assert!(!result);
let result = super::kb_is_priced_trade_event(Some("abc"), Some("2500"), Some(2.5));
assert!(!result);
let result = super::kb_is_priced_trade_event(Some("1000"), Some("abc"), Some(2.5));
assert!(!result);
let result = super::kb_is_priced_trade_event(Some("1000"), Some("2500"), Some(0.0));
assert!(!result);
let result = super::kb_is_priced_trade_event(Some("1000"), Some("2500"), Some(f64::NAN));
assert!(!result);
}
}