diff --git a/Cargo.toml b/Cargo.toml index a8e2b5e..e71c0ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,26 +19,27 @@ publish = false argon2 = { version = "^0.5", features = ["std", "zeroize"] } async-trait = { version = "^0.1", features = [] } base64 = { version = "^0.22", features = [] } +bs58 = {version = "^0.5", features = ["default", "cb58", "check"] } chacha20poly1305 = { version = "^0.10", features = ["std", "stream"] } chrono = { version = "^0.4", features = ["serde"] } fs2 = { version = "^0.4", features = [] } -futures-util = { version = "^0.3", features = ["default", "std" ,"futures-sink"] } +futures-util = { version = "^0.3", features = ["default" ,"futures-sink"] } jsonschema = { version = "^0.46", features = [] } rand = { version = "^0.10", features = ["std", "serde", "sys_rng"] } reqwest = { version = "^0.13", default-features = false, features = ["charset", "cookies", "deflate", "form", "gzip", "http2", "json", "multipart", "query", "rustls", "socks", "stream", "zstd"] } rustls = { version = "^0.23", features = ["aws-lc-rs"] } serde = { version = "^1.0", features = ["derive"] } serde_json = { version = "^1.0", features = [] } -solana-account-decoder-client-types = { version = "4.0.0-beta.7", features = ["zstd"] } -solana-address-lookup-table-interface = { version = "^3.0", features = ["bincode", "serde"] } -solana-client = { version = "^3.1", features = [] } +solana-account-decoder-client-types = { version = ">=4.0.0-rc.0", features = ["zstd"] } +solana-address-lookup-table-interface = { version = "^3.1", features = ["bincode", "serde"] } +solana-client = { version = ">=4.0.0-beta.7", features = [] } solana-compute-budget-interface = { version = "^3.0", features = ["borsh", "serde"] } -solana-rpc-client-api = { version = "4.0.0-beta.7", features = [] } -solana-rpc-client-types = { version = "4.0.0-beta.7", features = [] } +solana-rpc-client-api = { version = ">=4.0.0-rc.0", features = [] } +solana-rpc-client-types = { version = ">=4.0.0-rc.0", features = [] } solana-sdk = { version = "^4.0", features = ["full"] } solana-sdk-ids = { version = "^3.1", features = [] } -solana-system-interface = { version = "^3.0", features = ["alloc", "bincode", "serde", "std"] } -solana-transaction-status-client-types = { version = "4.0.0-beta.7", features = [] } +solana-system-interface = { version = "^3.2", features = ["alloc", "bincode", "serde", "std"] } +solana-transaction-status-client-types = { version = ">=4.0.0-rc.0", features = [] } spl-associated-token-account-interface = { version = "^2.0", features = ["borsh"] } spl-memo-interface = { version = "^2.0", features = [] } spl-token-interface = { version = "^2.0", features = [] } diff --git a/kb_app/frontend/ts/demo_pipeline2.ts b/kb_app/frontend/ts/demo_pipeline2.ts index e79763a..d078122 100644 --- a/kb_app/frontend/ts/demo_pipeline2.ts +++ b/kb_app/frontend/ts/demo_pipeline2.ts @@ -268,6 +268,12 @@ function renderCandlesChart( name: "OHLC", type: "candlestick", data: ohlcData, + itemStyle: { + color: "#16a34a", + color0: "#dc2626", + borderColor: "#15803d", + borderColor0: "#b91c1c", + }, }, { name: "Volume", diff --git a/kb_lib/Cargo.toml b/kb_lib/Cargo.toml index 6c05746..88ff73b 100644 --- a/kb_lib/Cargo.toml +++ b/kb_lib/Cargo.toml @@ -10,6 +10,7 @@ publish.workspace = true [dependencies] chrono.workspace = true +bs58.workspace = true futures-util.workspace = true reqwest.workspace = true serde.workspace = true diff --git a/kb_lib/src/dex.rs b/kb_lib/src/dex.rs index cf30871..ada1e88 100644 --- a/kb_lib/src/dex.rs +++ b/kb_lib/src/dex.rs @@ -12,45 +12,46 @@ mod pump_fun; mod pump_swap; mod raydium_amm_v4; -pub use dexlab::KB_DEXLAB_PROGRAM_ID; pub use dexlab::KbDexlabCreatePoolDecoded; pub use dexlab::KbDexlabDecodedEvent; pub use dexlab::KbDexlabDecoder; pub use dexlab::KbDexlabSwapDecoded; -pub use fluxbeam::KB_FLUXBEAM_PROGRAM_ID; +pub use dexlab::KB_DEXLAB_PROGRAM_ID; pub use fluxbeam::KbFluxbeamCreatePoolDecoded; pub use fluxbeam::KbFluxbeamDecodedEvent; pub use fluxbeam::KbFluxbeamDecoder; pub use fluxbeam::KbFluxbeamSwapDecoded; -pub use meteora_damm_v1::KB_METEORA_DAMM_V1_PROGRAM_ID; +pub use fluxbeam::KB_FLUXBEAM_PROGRAM_ID; pub use meteora_damm_v1::KbMeteoraDammV1CreatePoolDecoded; pub use meteora_damm_v1::KbMeteoraDammV1DecodedEvent; pub use meteora_damm_v1::KbMeteoraDammV1Decoder; pub use meteora_damm_v1::KbMeteoraDammV1SwapDecoded; -pub use meteora_damm_v2::KB_METEORA_DAMM_V2_PROGRAM_ID; +pub use meteora_damm_v1::KB_METEORA_DAMM_V1_PROGRAM_ID; pub use meteora_damm_v2::KbMeteoraDammV2CreatePoolDecoded; pub use meteora_damm_v2::KbMeteoraDammV2DecodedEvent; pub use meteora_damm_v2::KbMeteoraDammV2Decoder; pub use meteora_damm_v2::KbMeteoraDammV2SwapDecoded; -pub use meteora_dbc::KB_METEORA_DBC_PROGRAM_ID; +pub use meteora_damm_v2::KB_METEORA_DAMM_V2_PROGRAM_ID; pub use meteora_dbc::KbMeteoraDbcCreatePoolDecoded; pub use meteora_dbc::KbMeteoraDbcDecodedEvent; pub use meteora_dbc::KbMeteoraDbcDecoder; pub use meteora_dbc::KbMeteoraDbcSwapDecoded; -pub use orca_whirlpools::KB_ORCA_WHIRLPOOLS_PROGRAM_ID; +pub use meteora_dbc::KB_METEORA_DBC_PROGRAM_ID; pub use orca_whirlpools::KbOrcaWhirlpoolsCreatePoolDecoded; pub use orca_whirlpools::KbOrcaWhirlpoolsDecodedEvent; pub use orca_whirlpools::KbOrcaWhirlpoolsDecoder; pub use orca_whirlpools::KbOrcaWhirlpoolsSwapDecoded; -pub use pump_fun::KB_PUMP_FUN_PROGRAM_ID; +pub use orca_whirlpools::KB_ORCA_WHIRLPOOLS_PROGRAM_ID; pub use pump_fun::KbPumpFunCreateV2TokenDecoded; pub use pump_fun::KbPumpFunDecodedEvent; pub use pump_fun::KbPumpFunDecoder; -pub use pump_swap::KB_PUMP_SWAP_PROGRAM_ID; +pub use pump_fun::KbPumpFunTradeDecoded; +pub use pump_fun::KB_PUMP_FUN_PROGRAM_ID; pub use pump_swap::KbPumpSwapDecodedEvent; pub use pump_swap::KbPumpSwapDecoder; pub use pump_swap::KbPumpSwapTradeDecoded; -pub use raydium_amm_v4::KB_RAYDIUM_AMM_V4_PROGRAM_ID; +pub use pump_swap::KB_PUMP_SWAP_PROGRAM_ID; pub use raydium_amm_v4::KbRaydiumAmmV4DecodedEvent; pub use raydium_amm_v4::KbRaydiumAmmV4Decoder; pub use raydium_amm_v4::KbRaydiumAmmV4Initialize2PoolDecoded; +pub use raydium_amm_v4::KB_RAYDIUM_AMM_V4_PROGRAM_ID; diff --git a/kb_lib/src/dex/pump_fun.rs b/kb_lib/src/dex/pump_fun.rs index c0279e4..7b203b4 100644 --- a/kb_lib/src/dex/pump_fun.rs +++ b/kb_lib/src/dex/pump_fun.rs @@ -5,6 +5,9 @@ /// Pump.fun program id. pub const KB_PUMP_FUN_PROGRAM_ID: &str = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P"; +const KB_PUMP_FUN_BUY_DISCRIMINATOR: [u8; 8] = [102, 6, 61, 18, 1, 218, 235, 234]; +const KB_PUMP_FUN_SELL_DISCRIMINATOR: [u8; 8] = [51, 230, 133, 164, 1, 127, 131, 173]; + /// Decoded Pump.fun `create_v2` token event. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct KbPumpFunCreateV2TokenDecoded { @@ -28,11 +31,46 @@ pub struct KbPumpFunCreateV2TokenDecoded { pub payload_json: serde_json::Value, } +/// Decoded Pump.fun bonding-curve trade event. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbPumpFunTradeDecoded { + /// Parent transaction id. + pub transaction_id: i64, + /// Parent instruction id. + pub instruction_id: i64, + /// Transaction signature. + pub signature: std::string::String, + /// Program id. + pub program_id: std::string::String, + /// Trade side. + pub trade_side: crate::KbSwapTradeSide, + /// Token mint. + pub mint: std::option::Option, + /// Bonding curve account. + pub bonding_curve: std::option::Option, + /// Associated bonding curve token account. + pub associated_bonding_curve: std::option::Option, + /// User token account. + pub associated_user: std::option::Option, + /// User wallet account. + pub user: std::option::Option, + /// Decoded instruction amount, when available. + pub amount_raw: std::option::Option, + /// Decoded SOL limit/threshold argument, when available. + pub sol_limit_raw: std::option::Option, + /// Decoded payload. + pub payload_json: serde_json::Value, +} + /// Decoded Pump.fun event. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum KbPumpFunDecodedEvent { /// `create_v2` token creation. CreateV2Token(KbPumpFunCreateV2TokenDecoded), + /// Buy trade. + BuyTrade(KbPumpFunTradeDecoded), + /// Sell trade. + SellTrade(KbPumpFunTradeDecoded), } /// Pump.fun decoder. @@ -61,6 +99,9 @@ impl KbPumpFunDecoder { ))); } }; + if transaction.err_json.is_some() { + return Ok(std::vec::Vec::new()); + } let transaction_json_result = serde_json::from_str::(transaction.transaction_json.as_str()); let transaction_json = match transaction_json_result { @@ -88,9 +129,6 @@ impl KbPumpFunDecoder { if program_id.as_str() != crate::KB_PUMP_FUN_PROGRAM_ID { continue; } - if !has_create_v2_log { - continue; - } let instruction_id_option = instruction.id; let instruction_id = match instruction_id_option { Some(instruction_id) => instruction_id, @@ -101,6 +139,83 @@ impl KbPumpFunDecoder { Ok(accounts) => accounts, Err(error) => return Err(error), }; + let instruction_data = + kb_decode_optional_instruction_data(instruction.data_json.as_ref()); + let is_buy = kb_instruction_data_starts_with( + instruction_data.as_deref(), + &KB_PUMP_FUN_BUY_DISCRIMINATOR, + ); + let is_sell = kb_instruction_data_starts_with( + instruction_data.as_deref(), + &KB_PUMP_FUN_SELL_DISCRIMINATOR, + ); + if is_buy || is_sell { + if accounts.len() < 7 { + continue; + } + let amount_raw = kb_extract_u64_argument(instruction_data.as_deref(), 8); + let sol_limit_raw = kb_extract_u64_argument(instruction_data.as_deref(), 16); + let mint = kb_extract_account(&accounts, 2); + let bonding_curve = kb_extract_account(&accounts, 3); + let associated_bonding_curve = kb_extract_account(&accounts, 4); + let associated_user = kb_extract_account(&accounts, 5); + let user = kb_extract_account(&accounts, 6); + let fee_recipient = kb_extract_account(&accounts, 1); + let token_program = kb_extract_account(&accounts, 8); + let creator_vault = kb_extract_account(&accounts, 9); + let payload_json = serde_json::json!({ + "decoder": "pump_fun", + "signature": transaction.signature, + "instructionId": instruction_id, + "instructionIndex": instruction.instruction_index, + "accounts": accounts, + "logMessages": log_messages, + "eventKind": if is_buy { "buy" } else { "sell" }, + "poolAccount": bonding_curve, + "tokenAMint": mint, + "tokenBMint": crate::WSOL_MINT_ID, + "bondingCurve": bonding_curve, + "associatedBondingCurve": associated_bonding_curve, + "associatedUser": associated_user, + "user": user, + "feeRecipient": fee_recipient, + "tokenProgram": token_program, + "creatorVault": creator_vault, + "poolBaseTokenAccount": associated_bonding_curve, + "poolQuoteNativeAccount": bonding_curve, + "amountRaw": amount_raw, + "solLimitRaw": sol_limit_raw, + "tradeSide": if is_buy { "BuyBase" } else { "SellBase" } + }); + let event = crate::KbPumpFunTradeDecoded { + transaction_id, + instruction_id, + signature: transaction.signature.clone(), + program_id: program_id.clone(), + trade_side: if is_buy { + crate::KbSwapTradeSide::BuyBase + } else { + crate::KbSwapTradeSide::SellBase + }, + mint, + bonding_curve, + associated_bonding_curve, + associated_user, + user, + amount_raw, + sol_limit_raw, + payload_json, + }; + if is_buy { + decoded_events.push(crate::KbPumpFunDecodedEvent::BuyTrade(event)); + } else { + decoded_events.push(crate::KbPumpFunDecodedEvent::SellTrade(event)); + } + continue; + } + if !has_create_v2_log { + continue; + } if accounts.len() < 6 { continue; } @@ -139,6 +254,59 @@ impl KbPumpFunDecoder { } } +fn kb_decode_optional_instruction_data( + data_json: std::option::Option<&std::string::String>, +) -> std::option::Option> { + let data_json = match data_json { + Some(data_json) => data_json, + None => return None, + }; + let parsed_result = serde_json::from_str::(data_json.as_str()); + let encoded = match parsed_result { + Ok(parsed) => match parsed.as_str() { + Some(text) => text.to_string(), + None => data_json.clone(), + }, + Err(_) => data_json.clone(), + }; + let decode_result = bs58::decode(encoded.as_str()).into_vec(); + match decode_result { + Ok(decoded) => Some(decoded), + Err(_) => None, + } +} + +fn kb_instruction_data_starts_with( + instruction_data: std::option::Option<&[u8]>, + discriminator: &[u8; 8], +) -> bool { + let instruction_data = match instruction_data { + Some(instruction_data) => instruction_data, + None => return false, + }; + if instruction_data.len() < discriminator.len() { + return false; + } + &instruction_data[0..discriminator.len()] == discriminator +} + +fn kb_extract_u64_argument( + instruction_data: std::option::Option<&[u8]>, + offset: usize, +) -> std::option::Option { + let instruction_data = match instruction_data { + Some(instruction_data) => instruction_data, + None => return None, + }; + let end = offset + 8; + if instruction_data.len() < end { + return None; + } + let mut bytes = [0u8; 8]; + bytes.copy_from_slice(&instruction_data[offset..end]); + Some(u64::from_le_bytes(bytes).to_string()) +} + fn kb_extract_log_messages( transaction_json: &serde_json::Value, ) -> std::vec::Vec { @@ -167,10 +335,7 @@ fn kb_extract_log_messages( messages } -fn kb_log_messages_contain_keyword( - log_messages: &[std::string::String], - keyword: &str, -) -> bool { +fn kb_log_messages_contain_keyword(log_messages: &[std::string::String], keyword: &str) -> bool { let keyword_normalized = kb_normalize_log_text(keyword); for log_message in log_messages { let log_normalized = kb_normalize_log_text(log_message.as_str()); @@ -306,6 +471,12 @@ mod tests { ); assert_eq!(event.creator, Some("Creator111".to_string())); } + crate::KbPumpFunDecodedEvent::BuyTrade(_) => { + panic!("unexpected pump_fun buy trade event"); + } + crate::KbPumpFunDecodedEvent::SellTrade(_) => { + panic!("unexpected pump_fun sell trade event"); + } } } diff --git a/kb_lib/src/dex_decode.rs b/kb_lib/src/dex_decode.rs index 1236af4..d550ac0 100644 --- a/kb_lib/src/dex_decode.rs +++ b/kb_lib/src/dex_decode.rs @@ -1617,9 +1617,130 @@ impl KbDexDecodeService { } Ok(fetched) } + crate::KbPumpFunDecodedEvent::BuyTrade(event) => { + self.persist_pump_fun_trade_event( + transaction, + event, + "pump_fun.buy", + "signal.dex.pump_fun.buy", + "dex.pump_fun.buy", + ) + .await + } + crate::KbPumpFunDecodedEvent::SellTrade(event) => { + self.persist_pump_fun_trade_event( + transaction, + event, + "pump_fun.sell", + "signal.dex.pump_fun.sell", + "dex.pump_fun.sell", + ) + .await + } } } + async fn persist_pump_fun_trade_event( + &self, + transaction: &crate::KbChainTransactionDto, + event: &crate::KbPumpFunTradeDecoded, + event_kind: &str, + signal_kind: &str, + observation_kind: &str, + ) -> Result { + let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json = match payload_json_result { + Ok(payload_json) => payload_json, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot serialize decoded pump.fun trade payload: {}", + error + ))); + } + }; + let existing_result = crate::get_dex_decoded_event_by_key( + self.database.as_ref(), + event.transaction_id, + Some(event.instruction_id), + event_kind, + ) + .await; + let existing_option = match existing_result { + Ok(existing_option) => existing_option, + Err(error) => return Err(error), + }; + let already_present = existing_option.is_some(); + let dto = crate::KbDexDecodedEventDto::new( + event.transaction_id, + Some(event.instruction_id), + "pump_fun".to_string(), + event.program_id.clone(), + event_kind.to_string(), + event.bonding_curve.clone(), + None, + event.mint.clone(), + Some(crate::WSOL_MINT_ID.to_string()), + event.associated_bonding_curve.clone(), + payload_json, + ); + let upsert_result = crate::upsert_dex_decoded_event(self.database.as_ref(), &dto).await; + if let Err(error) = upsert_result { + return Err(error); + } + let fetched_result = crate::get_dex_decoded_event_by_key( + self.database.as_ref(), + event.transaction_id, + Some(event.instruction_id), + event_kind, + ) + .await; + let fetched_option = match fetched_result { + Ok(fetched_option) => fetched_option, + Err(error) => return Err(error), + }; + let fetched = match fetched_option { + Some(fetched) => fetched, + None => { + return Err(crate::KbError::InvalidState( + "decoded pump.fun trade event disappeared after upsert".to_string(), + )); + } + }; + if !already_present { + let payload_value = event.payload_json.clone(); + let observation_result = self + .persistence + .record_observation(&crate::KbDetectionObservationInput::new( + observation_kind.to_string(), + crate::KbObservationSourceKind::HttpRpc, + transaction.source_endpoint_name.clone(), + transaction.signature.clone(), + transaction.slot, + payload_value.clone(), + )) + .await; + let observation_id = match observation_result { + Ok(observation_id) => observation_id, + Err(error) => return Err(error), + }; + let signal_result = self + .persistence + .record_signal(&crate::KbDetectionSignalInput::new( + signal_kind.to_string(), + crate::KbAnalysisSignalSeverity::Low, + transaction.signature.clone(), + Some(observation_id), + None, + payload_value, + )) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + Ok(fetched) + } + async fn persist_pump_swap_event( &self, transaction: &crate::KbChainTransactionDto, diff --git a/kb_lib/src/dex_detect.rs b/kb_lib/src/dex_detect.rs index a1d9b33..e996f74 100644 --- a/kb_lib/src/dex_detect.rs +++ b/kb_lib/src/dex_detect.rs @@ -105,6 +105,30 @@ impl KbDexDetectService { }; detection_results.push(detect_result); } + if decoded_event.protocol_name == "pump_fun" + && decoded_event.event_kind == "pump_fun.buy" + { + let detect_result = self + .detect_pump_fun_trade(&transaction, decoded_event) + .await; + let detect_result = match detect_result { + Ok(detect_result) => detect_result, + Err(error) => return Err(error), + }; + detection_results.push(detect_result); + } + if decoded_event.protocol_name == "pump_fun" + && decoded_event.event_kind == "pump_fun.sell" + { + let detect_result = self + .detect_pump_fun_trade(&transaction, decoded_event) + .await; + let detect_result = match detect_result { + Ok(detect_result) => detect_result, + Err(error) => return Err(error), + }; + detection_results.push(detect_result); + } if decoded_event.protocol_name == "pump_swap" && decoded_event.event_kind == "pump_swap.buy" { @@ -773,6 +797,243 @@ impl KbDexDetectService { }) } + async fn detect_pump_fun_trade( + &self, + transaction: &crate::KbChainTransactionDto, + decoded_event: &crate::KbDexDecodedEventDto, + ) -> Result { + let decoded_event_id_option = decoded_event.id; + let decoded_event_id = match decoded_event_id_option { + Some(decoded_event_id) => decoded_event_id, + None => { + return Err(crate::KbError::InvalidState( + "decoded dex event has no internal id".to_string(), + )); + } + }; + let dex_id_result = self.ensure_pump_fun_dex().await; + let dex_id = match dex_id_result { + Ok(dex_id) => dex_id, + Err(error) => return Err(error), + }; + let pool_address_option = decoded_event.pool_account.clone(); + let pool_address = match pool_address_option { + Some(pool_address) => pool_address, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no pool_account", + decoded_event_id + ))); + } + }; + let token_a_mint_option = decoded_event.token_a_mint.clone(); + let token_a_mint = match token_a_mint_option { + Some(token_a_mint) => token_a_mint, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no token_a_mint", + decoded_event_id + ))); + } + }; + let token_b_mint_option = decoded_event.token_b_mint.clone(); + let token_b_mint = match token_b_mint_option { + Some(token_b_mint) => token_b_mint, + None => { + return Err(crate::KbError::InvalidState(format!( + "decoded event '{}' has no token_b_mint", + decoded_event_id + ))); + } + }; + let base_is_token_a = + kb_choose_base_quote_order(token_a_mint.as_str(), token_b_mint.as_str()); + let base_mint = if base_is_token_a { + token_a_mint.clone() + } else { + token_b_mint.clone() + }; + let quote_mint = if base_is_token_a { + token_b_mint.clone() + } else { + token_a_mint.clone() + }; + let base_token_id_result = self.ensure_token(base_mint.as_str()).await; + let base_token_id = match base_token_id_result { + Ok(base_token_id) => base_token_id, + Err(error) => return Err(error), + }; + let payload_value_result = kb_parse_payload_json(decoded_event.payload_json.as_str()); + let payload_value = match payload_value_result { + Ok(payload_value) => payload_value, + Err(error) => return Err(error), + }; + let vault_addresses = kb_extract_pump_fun_vault_addresses(&payload_value); + let token_a_vault_address = vault_addresses.0; + let token_b_vault_address = vault_addresses.1; + + let base_vault_address = if base_is_token_a { + token_a_vault_address.clone() + } else { + token_b_vault_address.clone() + }; + let quote_vault_address = if base_is_token_a { + token_b_vault_address.clone() + } else { + token_a_vault_address.clone() + }; + let quote_token_id_result = self.ensure_token(quote_mint.as_str()).await; + let quote_token_id = match quote_token_id_result { + Ok(quote_token_id) => quote_token_id, + Err(error) => return Err(error), + }; + let existing_pool_result = + crate::get_pool_by_address(self.database.as_ref(), pool_address.as_str()).await; + let existing_pool_option = match existing_pool_result { + Ok(existing_pool_option) => existing_pool_option, + Err(error) => return Err(error), + }; + let created_pool = existing_pool_option.is_none(); + let pool_id = match existing_pool_option { + Some(pool) => { + let pool_id_option = pool.id; + match pool_id_option { + Some(pool_id) => pool_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "pool '{}' has no internal id", + pool.address + ))); + } + } + } + None => { + let pool_dto = crate::KbPoolDto::new( + dex_id, + pool_address.clone(), + crate::KbPoolKind::BondingCurve, + crate::KbPoolStatus::Active, + ); + let upsert_result = crate::upsert_pool(self.database.as_ref(), &pool_dto).await; + match upsert_result { + Ok(pool_id) => pool_id, + Err(error) => return Err(error), + } + } + }; + let existing_pair_result = + crate::get_pair_by_pool_id(self.database.as_ref(), pool_id).await; + let existing_pair_option = match existing_pair_result { + Ok(existing_pair_option) => existing_pair_option, + Err(error) => return Err(error), + }; + let created_pair = existing_pair_option.is_none(); + let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str()); + let pair_dto = + crate::KbPairDto::new(dex_id, pool_id, base_token_id, quote_token_id, pair_symbol); + let pair_id_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await; + let pair_id = match pair_id_result { + Ok(pair_id) => pair_id, + Err(error) => return Err(error), + }; + let upsert_base_pool_token_result = crate::upsert_pool_token( + self.database.as_ref(), + &crate::KbPoolTokenDto::new( + pool_id, + base_token_id, + crate::KbPoolTokenRole::Base, + base_vault_address, + Some(0), + ), + ) + .await; + if let Err(error) = upsert_base_pool_token_result { + return Err(error); + } + let upsert_quote_pool_token_result = crate::upsert_pool_token( + self.database.as_ref(), + &crate::KbPoolTokenDto::new( + pool_id, + quote_token_id, + crate::KbPoolTokenRole::Quote, + quote_vault_address, + Some(1), + ), + ) + .await; + if let Err(error) = upsert_quote_pool_token_result { + return Err(error); + } + let existing_listing_result = + crate::get_pool_listing_by_pool_id(self.database.as_ref(), pool_id).await; + let existing_listing_option = match existing_listing_result { + Ok(existing_listing_option) => existing_listing_option, + Err(error) => return Err(error), + }; + let created_listing = existing_listing_option.is_none(); + let pool_listing_id = match existing_listing_option { + Some(pool_listing) => pool_listing.id, + None => { + let listing_id_result = self + .upsert_pool_listing_from_decoded_event(dex_id, pool_id, pair_id, transaction) + .await; + match listing_id_result { + Ok(listing_id) => Some(listing_id), + Err(error) => return Err(error), + } + } + }; + if created_pool { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.pump_fun.new_pool", + crate::KbAnalysisSignalSeverity::Low, + payload_value.clone(), + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + if created_pair { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.pump_fun.new_pair", + crate::KbAnalysisSignalSeverity::Low, + payload_value.clone(), + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + if created_listing { + let signal_result = self + .record_detection_signal( + transaction, + "signal.dex.pump_fun.first_listing_seen", + crate::KbAnalysisSignalSeverity::Low, + payload_value, + ) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + Ok(crate::KbDexPoolDetectionResult { + decoded_event_id, + dex_id, + pool_id, + pair_id, + pool_listing_id, + created_pool, + created_pair, + created_listing, + }) + } + async fn detect_pump_swap_trade( &self, transaction: &crate::KbChainTransactionDto, @@ -905,13 +1166,8 @@ impl KbDexDetectService { }; let created_pair = existing_pair_option.is_none(); let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str()); - let pair_dto = crate::KbPairDto::new( - dex_id, - pool_id, - base_token_id, - quote_token_id, - pair_symbol, - ); + let pair_dto = + crate::KbPairDto::new(dex_id, pool_id, base_token_id, quote_token_id, pair_symbol); let pair_id_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await; let pair_id = match pair_id_result { Ok(pair_id) => pair_id, @@ -2862,6 +3118,27 @@ fn kb_extract_string_from_array_index( Some(text.to_string()) } +fn kb_extract_pump_fun_vault_addresses( + payload_value: &serde_json::Value, +) -> ( + std::option::Option, + std::option::Option, +) { + let accounts_option = payload_value.get("accounts"); + let accounts = match accounts_option { + Some(accounts) => accounts, + None => return (None, None), + }; + let accounts_array_option = accounts.as_array(); + let accounts_array = match accounts_array_option { + Some(accounts_array) => accounts_array, + None => return (None, None), + }; + let token_a_vault_address = kb_extract_string_from_array_index(accounts_array, 4); + let token_b_native_address = kb_extract_string_from_array_index(accounts_array, 3); + (token_a_vault_address, token_b_native_address) +} + fn kb_extract_pump_swap_vault_addresses( payload_value: &serde_json::Value, ) -> ( diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index cfa9532..488973f 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -49,82 +49,6 @@ pub use config::KbSolanaConfig; pub use config::KbSqliteDatabaseConfig; pub use config::KbWsEndpointConfig; pub use constants::*; -pub use db::KbAnalysisSignalDto; -pub use db::KbAnalysisSignalEntity; -pub use db::KbAnalysisSignalSeverity; -pub use db::KbChainInstructionDto; -pub use db::KbChainInstructionEntity; -pub use db::KbChainSlotDto; -pub use db::KbChainSlotEntity; -pub use db::KbChainTransactionDto; -pub use db::KbChainTransactionEntity; -pub use db::KbDatabase; -pub use db::KbDatabaseBackend; -pub use db::KbDatabaseConnection; -pub use db::KbDbMetadataDto; -pub use db::KbDbMetadataEntity; -pub use db::KbDbRuntimeEventDto; -pub use db::KbDbRuntimeEventEntity; -pub use db::KbDbRuntimeEventLevel; -pub use db::KbDexDecodedEventDto; -pub use db::KbDexDecodedEventEntity; -pub use db::KbDexDto; -pub use db::KbDexEntity; -pub use db::KbKnownHttpEndpointDto; -pub use db::KbKnownHttpEndpointEntity; -pub use db::KbKnownWsEndpointDto; -pub use db::KbKnownWsEndpointEntity; -pub use db::KbLaunchAttributionDto; -pub use db::KbLaunchAttributionEntity; -pub use db::KbLaunchSurfaceDto; -pub use db::KbLaunchSurfaceEntity; -pub use db::KbLaunchSurfaceKeyDto; -pub use db::KbLaunchSurfaceKeyEntity; -pub use db::KbLiquidityEventDto; -pub use db::KbLiquidityEventEntity; -pub use db::KbLiquidityEventKind; -pub use db::KbObservationSourceKind; -pub use db::KbObservedTokenDto; -pub use db::KbObservedTokenEntity; -pub use db::KbObservedTokenStatus; -pub use db::KbOnchainObservationDto; -pub use db::KbOnchainObservationEntity; -pub use db::KbPairAnalyticSignalDto; -pub use db::KbPairAnalyticSignalEntity; -pub use db::KbPairCandleDto; -pub use db::KbPairCandleEntity; -pub use db::KbPairDto; -pub use db::KbPairEntity; -pub use db::KbPairMetricDto; -pub use db::KbPairMetricEntity; -pub use db::KbPoolDto; -pub use db::KbPoolEntity; -pub use db::KbPoolKind; -pub use db::KbPoolListingDto; -pub use db::KbPoolListingEntity; -pub use db::KbPoolOriginDto; -pub use db::KbPoolOriginEntity; -pub use db::KbPoolStatus; -pub use db::KbPoolTokenDto; -pub use db::KbPoolTokenEntity; -pub use db::KbPoolTokenRole; -pub use db::KbSwapDto; -pub use db::KbSwapEntity; -pub use db::KbSwapTradeSide; -pub use db::KbTokenBurnEventDto; -pub use db::KbTokenBurnEventEntity; -pub use db::KbTokenDto; -pub use db::KbTokenEntity; -pub use db::KbTokenMintEventDto; -pub use db::KbTokenMintEventEntity; -pub use db::KbTradeEventDto; -pub use db::KbTradeEventEntity; -pub use db::KbWalletDto; -pub use db::KbWalletEntity; -pub use db::KbWalletHoldingDto; -pub use db::KbWalletHoldingEntity; -pub use db::KbWalletParticipationDto; -pub use db::KbWalletParticipationEntity; pub use db::delete_chain_instructions_by_transaction_id; pub use db::get_chain_slot; pub use db::get_chain_transaction_by_signature; @@ -215,6 +139,82 @@ pub use db::upsert_trade_event; pub use db::upsert_wallet; pub use db::upsert_wallet_holding; pub use db::upsert_wallet_participation; +pub use db::KbAnalysisSignalDto; +pub use db::KbAnalysisSignalEntity; +pub use db::KbAnalysisSignalSeverity; +pub use db::KbChainInstructionDto; +pub use db::KbChainInstructionEntity; +pub use db::KbChainSlotDto; +pub use db::KbChainSlotEntity; +pub use db::KbChainTransactionDto; +pub use db::KbChainTransactionEntity; +pub use db::KbDatabase; +pub use db::KbDatabaseBackend; +pub use db::KbDatabaseConnection; +pub use db::KbDbMetadataDto; +pub use db::KbDbMetadataEntity; +pub use db::KbDbRuntimeEventDto; +pub use db::KbDbRuntimeEventEntity; +pub use db::KbDbRuntimeEventLevel; +pub use db::KbDexDecodedEventDto; +pub use db::KbDexDecodedEventEntity; +pub use db::KbDexDto; +pub use db::KbDexEntity; +pub use db::KbKnownHttpEndpointDto; +pub use db::KbKnownHttpEndpointEntity; +pub use db::KbKnownWsEndpointDto; +pub use db::KbKnownWsEndpointEntity; +pub use db::KbLaunchAttributionDto; +pub use db::KbLaunchAttributionEntity; +pub use db::KbLaunchSurfaceDto; +pub use db::KbLaunchSurfaceEntity; +pub use db::KbLaunchSurfaceKeyDto; +pub use db::KbLaunchSurfaceKeyEntity; +pub use db::KbLiquidityEventDto; +pub use db::KbLiquidityEventEntity; +pub use db::KbLiquidityEventKind; +pub use db::KbObservationSourceKind; +pub use db::KbObservedTokenDto; +pub use db::KbObservedTokenEntity; +pub use db::KbObservedTokenStatus; +pub use db::KbOnchainObservationDto; +pub use db::KbOnchainObservationEntity; +pub use db::KbPairAnalyticSignalDto; +pub use db::KbPairAnalyticSignalEntity; +pub use db::KbPairCandleDto; +pub use db::KbPairCandleEntity; +pub use db::KbPairDto; +pub use db::KbPairEntity; +pub use db::KbPairMetricDto; +pub use db::KbPairMetricEntity; +pub use db::KbPoolDto; +pub use db::KbPoolEntity; +pub use db::KbPoolKind; +pub use db::KbPoolListingDto; +pub use db::KbPoolListingEntity; +pub use db::KbPoolOriginDto; +pub use db::KbPoolOriginEntity; +pub use db::KbPoolStatus; +pub use db::KbPoolTokenDto; +pub use db::KbPoolTokenEntity; +pub use db::KbPoolTokenRole; +pub use db::KbSwapDto; +pub use db::KbSwapEntity; +pub use db::KbSwapTradeSide; +pub use db::KbTokenBurnEventDto; +pub use db::KbTokenBurnEventEntity; +pub use db::KbTokenDto; +pub use db::KbTokenEntity; +pub use db::KbTokenMintEventDto; +pub use db::KbTokenMintEventEntity; +pub use db::KbTradeEventDto; +pub use db::KbTradeEventEntity; +pub use db::KbWalletDto; +pub use db::KbWalletEntity; +pub use db::KbWalletHoldingDto; +pub use db::KbWalletHoldingEntity; +pub use db::KbWalletParticipationDto; +pub use db::KbWalletParticipationEntity; pub use detect::KbDetectionObservationInput; pub use detect::KbDetectionPersistenceService; pub use detect::KbDetectionPoolCandidateInput; @@ -227,15 +227,6 @@ pub use detect::KbSolanaWsDetectionService; pub use detect::KbWsDetectionNotificationEnvelope; pub use detect::KbWsDetectionRelay; pub use detect::KbWsDetectionRelayStats; -pub use dex::KB_DEXLAB_PROGRAM_ID; -pub use dex::KB_FLUXBEAM_PROGRAM_ID; -pub use dex::KB_METEORA_DAMM_V1_PROGRAM_ID; -pub use dex::KB_METEORA_DAMM_V2_PROGRAM_ID; -pub use dex::KB_METEORA_DBC_PROGRAM_ID; -pub use dex::KB_ORCA_WHIRLPOOLS_PROGRAM_ID; -pub use dex::KB_PUMP_FUN_PROGRAM_ID; -pub use dex::KB_PUMP_SWAP_PROGRAM_ID; -pub use dex::KB_RAYDIUM_AMM_V4_PROGRAM_ID; pub use dex::KbDexlabCreatePoolDecoded; pub use dex::KbDexlabDecodedEvent; pub use dex::KbDexlabDecoder; @@ -263,16 +254,28 @@ pub use dex::KbOrcaWhirlpoolsSwapDecoded; pub use dex::KbPumpFunCreateV2TokenDecoded; pub use dex::KbPumpFunDecodedEvent; pub use dex::KbPumpFunDecoder; +pub use dex::KbPumpFunTradeDecoded; pub use dex::KbPumpSwapDecodedEvent; pub use dex::KbPumpSwapDecoder; pub use dex::KbPumpSwapTradeDecoded; pub use dex::KbRaydiumAmmV4DecodedEvent; pub use dex::KbRaydiumAmmV4Decoder; pub use dex::KbRaydiumAmmV4Initialize2PoolDecoded; +pub use dex::KB_DEXLAB_PROGRAM_ID; +pub use dex::KB_FLUXBEAM_PROGRAM_ID; +pub use dex::KB_METEORA_DAMM_V1_PROGRAM_ID; +pub use dex::KB_METEORA_DAMM_V2_PROGRAM_ID; +pub use dex::KB_METEORA_DBC_PROGRAM_ID; +pub use dex::KB_ORCA_WHIRLPOOLS_PROGRAM_ID; +pub use dex::KB_PUMP_FUN_PROGRAM_ID; +pub use dex::KB_PUMP_SWAP_PROGRAM_ID; +pub use dex::KB_RAYDIUM_AMM_V4_PROGRAM_ID; pub use dex_decode::KbDexDecodeService; pub use dex_detect::KbDexDetectService; pub use dex_detect::KbDexPoolDetectionResult; pub use error::KbError; +pub use http_client::parse_kb_json_rpc_http_response_text; +pub use http_client::parse_kb_json_rpc_http_response_value; pub use http_client::HttpClient; pub use http_client::KbHttpEndpointStatus; pub use http_client::KbHttpMethodClass; @@ -281,10 +284,11 @@ pub use http_client::KbJsonRpcHttpErrorResponse; pub use http_client::KbJsonRpcHttpRequest; pub use http_client::KbJsonRpcHttpResponse; pub use http_client::KbJsonRpcHttpSuccessResponse; -pub use http_client::parse_kb_json_rpc_http_response_text; -pub use http_client::parse_kb_json_rpc_http_response_value; pub use http_pool::HttpEndpointPool; pub use http_pool::KbHttpPoolClientSnapshot; +pub use json_rpc_ws::kb_is_probable_json_rpc_object_text; +pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_text; +pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_value; pub use json_rpc_ws::KbJsonRpcWsErrorObject; pub use json_rpc_ws::KbJsonRpcWsErrorResponse; pub use json_rpc_ws::KbJsonRpcWsIncomingMessage; @@ -292,9 +296,6 @@ pub use json_rpc_ws::KbJsonRpcWsNotification; pub use json_rpc_ws::KbJsonRpcWsNotificationParams; pub use json_rpc_ws::KbJsonRpcWsRequest; pub use json_rpc_ws::KbJsonRpcWsSuccessResponse; -pub use json_rpc_ws::kb_is_probable_json_rpc_object_text; -pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_text; -pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_value; pub use launch_origin::KbLaunchAttributionResult; pub use launch_origin::KbLaunchOriginService; pub use pair_analytic_signal::KbPairAnalyticSignalResult; @@ -304,14 +305,14 @@ pub use pair_candle_aggregation::KbPairCandleAggregationService; pub use pair_candle_query::KbPairCandleQueryService; pub use pool_origin::KbPoolOriginResult; pub use pool_origin::KbPoolOriginService; -pub use solana_pubsub_ws::KbSolanaWsTypedNotification; pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification; pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event; +pub use solana_pubsub_ws::KbSolanaWsTypedNotification; pub use token_backfill::KbPoolBackfillResult; pub use token_backfill::KbTokenBackfillResult; pub use token_backfill::KbTokenBackfillService; -pub use tracing::KbTracingGuard; pub use tracing::init_tracing; +pub use tracing::KbTracingGuard; pub use trade_aggregation::KbTradeAggregationResult; pub use trade_aggregation::KbTradeAggregationService; pub use tx_model::KbTransactionModelService; diff --git a/kb_lib/src/pair_candle_query.rs b/kb_lib/src/pair_candle_query.rs index 2824294..ae183a2 100644 --- a/kb_lib/src/pair_candle_query.rs +++ b/kb_lib/src/pair_candle_query.rs @@ -174,7 +174,7 @@ mod tests { }; std::sync::Arc::new(database) } - + async fn seed_fluxbeam_swap_transaction( database: std::sync::Arc, signature: &str, @@ -346,7 +346,7 @@ mod tests { seed_fluxbeam_swap_transaction( database.clone(), "sig-pair-candle-fallback-2", - 1_700_020_020, + 1_700_020_010, "1000", "3000", ) @@ -389,5 +389,4 @@ mod tests { assert_eq!(candles[0].close_price_quote_per_base, 3.0); assert_eq!(candles[0].trade_count, 2); } - } diff --git a/kb_lib/src/trade_aggregation.rs b/kb_lib/src/trade_aggregation.rs index e231d86..25a0e04 100644 --- a/kb_lib/src/trade_aggregation.rs +++ b/kb_lib/src/trade_aggregation.rs @@ -203,6 +203,31 @@ impl KbTradeAggregationService { price_quote_per_base = inferred.2; } } + if decoded_event.event_kind.starts_with("pump_fun.") + && (base_amount_raw.is_none() + || quote_amount_raw.is_none() + || price_quote_per_base.is_none()) + { + let inferred_result = kb_extract_pump_fun_amounts_from_transaction( + transaction.transaction_json.as_str(), + transaction.meta_json.as_deref(), + base_vault_address.as_deref(), + quote_vault_address.as_deref(), + ); + let inferred = match inferred_result { + Ok(inferred) => inferred, + Err(error) => return Err(error), + }; + if base_amount_raw.is_none() { + base_amount_raw = inferred.0; + } + if quote_amount_raw.is_none() { + quote_amount_raw = inferred.1; + } + if price_quote_per_base.is_none() { + price_quote_per_base = inferred.2; + } + } if price_quote_per_base.is_none() { price_quote_per_base = kb_compute_price_quote_per_base_with_decimals( transaction.meta_json.as_deref(), @@ -211,6 +236,12 @@ impl KbTradeAggregationService { quote_vault_address.as_deref(), ); } + if price_quote_per_base.is_none() { + price_quote_per_base = kb_compute_price_quote_per_base_from_raw_amounts( + base_amount_raw.as_deref(), + quote_amount_raw.as_deref(), + ); + } let slot_i64 = kb_convert_slot_to_i64(transaction.slot); let created_trade_event = existing_trade_option.is_none(); let trade_event_dto = crate::KbTradeEventDto::new( @@ -637,6 +668,146 @@ fn kb_extract_pump_swap_amounts_from_transaction( Ok((base_amount_raw, quote_amount_raw, price_quote_per_base)) } +fn kb_extract_pump_fun_amounts_from_transaction( + transaction_json: &str, + meta_json: std::option::Option<&str>, + base_vault_address: std::option::Option<&str>, + quote_native_address: std::option::Option<&str>, +) -> Result< + ( + std::option::Option, + std::option::Option, + std::option::Option, + ), + crate::KbError, +> { + let meta_json = match meta_json { + Some(meta_json) => meta_json, + None => return Ok((None, None, None)), + }; + let transaction_value_result = serde_json::from_str::(transaction_json); + let transaction_value = match transaction_value_result { + Ok(transaction_value) => transaction_value, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot parse transaction_json for pump_fun amount extraction: {}", + error + ))); + } + }; + let meta_value_result = serde_json::from_str::(meta_json); + let meta_value = match meta_value_result { + Ok(meta_value) => meta_value, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot parse meta_json for pump_fun amount extraction: {}", + error + ))); + } + }; + let account_keys_result = kb_extract_transaction_account_keys(&transaction_value); + let account_keys = match account_keys_result { + Ok(account_keys) => account_keys, + Err(error) => return Err(error), + }; + let pre_balances_result = + kb_extract_token_balance_map(&meta_value, &account_keys, "preTokenBalances"); + let pre_balances = match pre_balances_result { + Ok(pre_balances) => pre_balances, + Err(error) => return Err(error), + }; + let post_balances_result = + kb_extract_token_balance_map(&meta_value, &account_keys, "postTokenBalances"); + let post_balances = match post_balances_result { + Ok(post_balances) => post_balances, + Err(error) => return Err(error), + }; + let mut base_amount_raw = None; + let mut quote_amount_raw = None; + let mut price_quote_per_base = None; + let mut base_delta_ui = None; + if let Some(base_vault_address) = base_vault_address { + let base_pre = pre_balances.get(base_vault_address); + let base_post = post_balances.get(base_vault_address); + let base_pre_raw = base_pre.map(|value| value.0.clone()); + let base_post_raw = base_post.map(|value| value.0.clone()); + base_amount_raw = kb_compute_amount_delta_abs(base_pre_raw, base_post_raw); + let base_pre_ui = base_pre.and_then(|value| value.1); + let base_post_ui = base_post.and_then(|value| value.1); + base_delta_ui = kb_compute_ui_delta_abs(base_pre_ui, base_post_ui); + } + if let Some(quote_native_address) = quote_native_address { + let quote_delta_result = kb_extract_native_balance_delta_by_address( + &meta_value, + &account_keys, + quote_native_address, + ); + let quote_delta = match quote_delta_result { + Ok(quote_delta) => quote_delta, + Err(error) => return Err(error), + }; + if let Some(quote_delta_lamports) = quote_delta { + quote_amount_raw = Some(quote_delta_lamports.to_string()); + let quote_delta_ui = quote_delta_lamports as f64 / 1_000_000_000.0; + if let Some(base_delta_ui) = base_delta_ui { + if base_delta_ui > 0.0 { + price_quote_per_base = Some(quote_delta_ui / base_delta_ui); + } + } + } + } + Ok((base_amount_raw, quote_amount_raw, price_quote_per_base)) +} + +fn kb_extract_native_balance_delta_by_address( + meta_value: &serde_json::Value, + account_keys: &[std::string::String], + address: &str, +) -> Result, crate::KbError> { + let mut account_index = None; + for (index, account_key) in account_keys.iter().enumerate() { + if account_key.as_str() == address { + account_index = Some(index); + break; + } + } + let account_index = match account_index { + Some(account_index) => account_index, + None => return Ok(None), + }; + let pre_balances_option = meta_value + .get("preBalances") + .and_then(|value| value.as_array()); + let post_balances_option = meta_value + .get("postBalances") + .and_then(|value| value.as_array()); + let pre_balances = match pre_balances_option { + Some(pre_balances) => pre_balances, + None => return Ok(None), + }; + let post_balances = match post_balances_option { + Some(post_balances) => post_balances, + None => return Ok(None), + }; + if account_index >= pre_balances.len() || account_index >= post_balances.len() { + return Ok(None); + } + let pre_balance_option = pre_balances[account_index].as_u64(); + let post_balance_option = post_balances[account_index].as_u64(); + let pre_balance = match pre_balance_option { + Some(pre_balance) => pre_balance, + None => return Ok(None), + }; + let post_balance = match post_balance_option { + Some(post_balance) => post_balance, + None => return Ok(None), + }; + if post_balance >= pre_balance { + return Ok(Some(post_balance - pre_balance)); + } + Ok(Some(pre_balance - post_balance)) +} + fn kb_extract_transaction_account_keys( transaction_value: &serde_json::Value, ) -> Result, crate::KbError> { @@ -792,6 +963,37 @@ fn kb_compute_ui_delta_abs( Some(delta) } +fn kb_compute_price_quote_per_base_from_raw_amounts( + base_amount_raw: std::option::Option<&str>, + quote_amount_raw: std::option::Option<&str>, +) -> std::option::Option { + let base_amount_raw = match base_amount_raw { + Some(base_amount_raw) => base_amount_raw.trim(), + None => return None, + }; + let quote_amount_raw = match quote_amount_raw { + Some(quote_amount_raw) => quote_amount_raw.trim(), + None => return None, + }; + if base_amount_raw.is_empty() || quote_amount_raw.is_empty() { + return None; + } + let base_amount_result = base_amount_raw.parse::(); + let base_amount = match base_amount_result { + Ok(base_amount) => base_amount, + Err(_) => return None, + }; + let quote_amount_result = quote_amount_raw.parse::(); + let quote_amount = match quote_amount_result { + Ok(quote_amount) => quote_amount, + Err(_) => return None, + }; + if base_amount <= 0.0 { + return None; + } + Some(quote_amount / base_amount) +} + fn kb_compute_price_quote_per_base_with_decimals( meta_json: std::option::Option<&str>, transaction_json: &str,