0.7.24-pre.2

This commit is contained in:
2026-05-03 07:03:23 +02:00
parent d10a2270d8
commit d44171ca6f
10 changed files with 906 additions and 126 deletions

View File

@@ -10,6 +10,7 @@ publish.workspace = true
[dependencies]
chrono.workspace = true
bs58.workspace = true
futures-util.workspace = true
reqwest.workspace = true
serde.workspace = true

View File

@@ -12,45 +12,46 @@ mod pump_fun;
mod pump_swap;
mod raydium_amm_v4;
pub use dexlab::KB_DEXLAB_PROGRAM_ID;
pub use dexlab::KbDexlabCreatePoolDecoded;
pub use dexlab::KbDexlabDecodedEvent;
pub use dexlab::KbDexlabDecoder;
pub use dexlab::KbDexlabSwapDecoded;
pub use fluxbeam::KB_FLUXBEAM_PROGRAM_ID;
pub use dexlab::KB_DEXLAB_PROGRAM_ID;
pub use fluxbeam::KbFluxbeamCreatePoolDecoded;
pub use fluxbeam::KbFluxbeamDecodedEvent;
pub use fluxbeam::KbFluxbeamDecoder;
pub use fluxbeam::KbFluxbeamSwapDecoded;
pub use meteora_damm_v1::KB_METEORA_DAMM_V1_PROGRAM_ID;
pub use fluxbeam::KB_FLUXBEAM_PROGRAM_ID;
pub use meteora_damm_v1::KbMeteoraDammV1CreatePoolDecoded;
pub use meteora_damm_v1::KbMeteoraDammV1DecodedEvent;
pub use meteora_damm_v1::KbMeteoraDammV1Decoder;
pub use meteora_damm_v1::KbMeteoraDammV1SwapDecoded;
pub use meteora_damm_v2::KB_METEORA_DAMM_V2_PROGRAM_ID;
pub use meteora_damm_v1::KB_METEORA_DAMM_V1_PROGRAM_ID;
pub use meteora_damm_v2::KbMeteoraDammV2CreatePoolDecoded;
pub use meteora_damm_v2::KbMeteoraDammV2DecodedEvent;
pub use meteora_damm_v2::KbMeteoraDammV2Decoder;
pub use meteora_damm_v2::KbMeteoraDammV2SwapDecoded;
pub use meteora_dbc::KB_METEORA_DBC_PROGRAM_ID;
pub use meteora_damm_v2::KB_METEORA_DAMM_V2_PROGRAM_ID;
pub use meteora_dbc::KbMeteoraDbcCreatePoolDecoded;
pub use meteora_dbc::KbMeteoraDbcDecodedEvent;
pub use meteora_dbc::KbMeteoraDbcDecoder;
pub use meteora_dbc::KbMeteoraDbcSwapDecoded;
pub use orca_whirlpools::KB_ORCA_WHIRLPOOLS_PROGRAM_ID;
pub use meteora_dbc::KB_METEORA_DBC_PROGRAM_ID;
pub use orca_whirlpools::KbOrcaWhirlpoolsCreatePoolDecoded;
pub use orca_whirlpools::KbOrcaWhirlpoolsDecodedEvent;
pub use orca_whirlpools::KbOrcaWhirlpoolsDecoder;
pub use orca_whirlpools::KbOrcaWhirlpoolsSwapDecoded;
pub use pump_fun::KB_PUMP_FUN_PROGRAM_ID;
pub use orca_whirlpools::KB_ORCA_WHIRLPOOLS_PROGRAM_ID;
pub use pump_fun::KbPumpFunCreateV2TokenDecoded;
pub use pump_fun::KbPumpFunDecodedEvent;
pub use pump_fun::KbPumpFunDecoder;
pub use pump_swap::KB_PUMP_SWAP_PROGRAM_ID;
pub use pump_fun::KbPumpFunTradeDecoded;
pub use pump_fun::KB_PUMP_FUN_PROGRAM_ID;
pub use pump_swap::KbPumpSwapDecodedEvent;
pub use pump_swap::KbPumpSwapDecoder;
pub use pump_swap::KbPumpSwapTradeDecoded;
pub use raydium_amm_v4::KB_RAYDIUM_AMM_V4_PROGRAM_ID;
pub use pump_swap::KB_PUMP_SWAP_PROGRAM_ID;
pub use raydium_amm_v4::KbRaydiumAmmV4DecodedEvent;
pub use raydium_amm_v4::KbRaydiumAmmV4Decoder;
pub use raydium_amm_v4::KbRaydiumAmmV4Initialize2PoolDecoded;
pub use raydium_amm_v4::KB_RAYDIUM_AMM_V4_PROGRAM_ID;

View File

@@ -5,6 +5,9 @@
/// Pump.fun program id.
pub const KB_PUMP_FUN_PROGRAM_ID: &str = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P";
const KB_PUMP_FUN_BUY_DISCRIMINATOR: [u8; 8] = [102, 6, 61, 18, 1, 218, 235, 234];
const KB_PUMP_FUN_SELL_DISCRIMINATOR: [u8; 8] = [51, 230, 133, 164, 1, 127, 131, 173];
/// Decoded Pump.fun `create_v2` token event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbPumpFunCreateV2TokenDecoded {
@@ -28,11 +31,46 @@ pub struct KbPumpFunCreateV2TokenDecoded {
pub payload_json: serde_json::Value,
}
/// Decoded Pump.fun bonding-curve trade event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbPumpFunTradeDecoded {
/// Parent transaction id.
pub transaction_id: i64,
/// Parent instruction id.
pub instruction_id: i64,
/// Transaction signature.
pub signature: std::string::String,
/// Program id.
pub program_id: std::string::String,
/// Trade side.
pub trade_side: crate::KbSwapTradeSide,
/// Token mint.
pub mint: std::option::Option<std::string::String>,
/// Bonding curve account.
pub bonding_curve: std::option::Option<std::string::String>,
/// Associated bonding curve token account.
pub associated_bonding_curve: std::option::Option<std::string::String>,
/// User token account.
pub associated_user: std::option::Option<std::string::String>,
/// User wallet account.
pub user: std::option::Option<std::string::String>,
/// Decoded instruction amount, when available.
pub amount_raw: std::option::Option<std::string::String>,
/// Decoded SOL limit/threshold argument, when available.
pub sol_limit_raw: std::option::Option<std::string::String>,
/// Decoded payload.
pub payload_json: serde_json::Value,
}
/// Decoded Pump.fun event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum KbPumpFunDecodedEvent {
/// `create_v2` token creation.
CreateV2Token(KbPumpFunCreateV2TokenDecoded),
/// Buy trade.
BuyTrade(KbPumpFunTradeDecoded),
/// Sell trade.
SellTrade(KbPumpFunTradeDecoded),
}
/// Pump.fun decoder.
@@ -61,6 +99,9 @@ impl KbPumpFunDecoder {
)));
}
};
if transaction.err_json.is_some() {
return Ok(std::vec::Vec::new());
}
let transaction_json_result =
serde_json::from_str::<serde_json::Value>(transaction.transaction_json.as_str());
let transaction_json = match transaction_json_result {
@@ -88,9 +129,6 @@ impl KbPumpFunDecoder {
if program_id.as_str() != crate::KB_PUMP_FUN_PROGRAM_ID {
continue;
}
if !has_create_v2_log {
continue;
}
let instruction_id_option = instruction.id;
let instruction_id = match instruction_id_option {
Some(instruction_id) => instruction_id,
@@ -101,6 +139,83 @@ impl KbPumpFunDecoder {
Ok(accounts) => accounts,
Err(error) => return Err(error),
};
let instruction_data =
kb_decode_optional_instruction_data(instruction.data_json.as_ref());
let is_buy = kb_instruction_data_starts_with(
instruction_data.as_deref(),
&KB_PUMP_FUN_BUY_DISCRIMINATOR,
);
let is_sell = kb_instruction_data_starts_with(
instruction_data.as_deref(),
&KB_PUMP_FUN_SELL_DISCRIMINATOR,
);
if is_buy || is_sell {
if accounts.len() < 7 {
continue;
}
let amount_raw = kb_extract_u64_argument(instruction_data.as_deref(), 8);
let sol_limit_raw = kb_extract_u64_argument(instruction_data.as_deref(), 16);
let mint = kb_extract_account(&accounts, 2);
let bonding_curve = kb_extract_account(&accounts, 3);
let associated_bonding_curve = kb_extract_account(&accounts, 4);
let associated_user = kb_extract_account(&accounts, 5);
let user = kb_extract_account(&accounts, 6);
let fee_recipient = kb_extract_account(&accounts, 1);
let token_program = kb_extract_account(&accounts, 8);
let creator_vault = kb_extract_account(&accounts, 9);
let payload_json = serde_json::json!({
"decoder": "pump_fun",
"signature": transaction.signature,
"instructionId": instruction_id,
"instructionIndex": instruction.instruction_index,
"accounts": accounts,
"logMessages": log_messages,
"eventKind": if is_buy { "buy" } else { "sell" },
"poolAccount": bonding_curve,
"tokenAMint": mint,
"tokenBMint": crate::WSOL_MINT_ID,
"bondingCurve": bonding_curve,
"associatedBondingCurve": associated_bonding_curve,
"associatedUser": associated_user,
"user": user,
"feeRecipient": fee_recipient,
"tokenProgram": token_program,
"creatorVault": creator_vault,
"poolBaseTokenAccount": associated_bonding_curve,
"poolQuoteNativeAccount": bonding_curve,
"amountRaw": amount_raw,
"solLimitRaw": sol_limit_raw,
"tradeSide": if is_buy { "BuyBase" } else { "SellBase" }
});
let event = crate::KbPumpFunTradeDecoded {
transaction_id,
instruction_id,
signature: transaction.signature.clone(),
program_id: program_id.clone(),
trade_side: if is_buy {
crate::KbSwapTradeSide::BuyBase
} else {
crate::KbSwapTradeSide::SellBase
},
mint,
bonding_curve,
associated_bonding_curve,
associated_user,
user,
amount_raw,
sol_limit_raw,
payload_json,
};
if is_buy {
decoded_events.push(crate::KbPumpFunDecodedEvent::BuyTrade(event));
} else {
decoded_events.push(crate::KbPumpFunDecodedEvent::SellTrade(event));
}
continue;
}
if !has_create_v2_log {
continue;
}
if accounts.len() < 6 {
continue;
}
@@ -139,6 +254,59 @@ impl KbPumpFunDecoder {
}
}
fn kb_decode_optional_instruction_data(
data_json: std::option::Option<&std::string::String>,
) -> std::option::Option<std::vec::Vec<u8>> {
let data_json = match data_json {
Some(data_json) => data_json,
None => return None,
};
let parsed_result = serde_json::from_str::<serde_json::Value>(data_json.as_str());
let encoded = match parsed_result {
Ok(parsed) => match parsed.as_str() {
Some(text) => text.to_string(),
None => data_json.clone(),
},
Err(_) => data_json.clone(),
};
let decode_result = bs58::decode(encoded.as_str()).into_vec();
match decode_result {
Ok(decoded) => Some(decoded),
Err(_) => None,
}
}
fn kb_instruction_data_starts_with(
instruction_data: std::option::Option<&[u8]>,
discriminator: &[u8; 8],
) -> bool {
let instruction_data = match instruction_data {
Some(instruction_data) => instruction_data,
None => return false,
};
if instruction_data.len() < discriminator.len() {
return false;
}
&instruction_data[0..discriminator.len()] == discriminator
}
fn kb_extract_u64_argument(
instruction_data: std::option::Option<&[u8]>,
offset: usize,
) -> std::option::Option<std::string::String> {
let instruction_data = match instruction_data {
Some(instruction_data) => instruction_data,
None => return None,
};
let end = offset + 8;
if instruction_data.len() < end {
return None;
}
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&instruction_data[offset..end]);
Some(u64::from_le_bytes(bytes).to_string())
}
fn kb_extract_log_messages(
transaction_json: &serde_json::Value,
) -> std::vec::Vec<std::string::String> {
@@ -167,10 +335,7 @@ fn kb_extract_log_messages(
messages
}
fn kb_log_messages_contain_keyword(
log_messages: &[std::string::String],
keyword: &str,
) -> bool {
fn kb_log_messages_contain_keyword(log_messages: &[std::string::String], keyword: &str) -> bool {
let keyword_normalized = kb_normalize_log_text(keyword);
for log_message in log_messages {
let log_normalized = kb_normalize_log_text(log_message.as_str());
@@ -306,6 +471,12 @@ mod tests {
);
assert_eq!(event.creator, Some("Creator111".to_string()));
}
crate::KbPumpFunDecodedEvent::BuyTrade(_) => {
panic!("unexpected pump_fun buy trade event");
}
crate::KbPumpFunDecodedEvent::SellTrade(_) => {
panic!("unexpected pump_fun sell trade event");
}
}
}

View File

@@ -1617,9 +1617,130 @@ impl KbDexDecodeService {
}
Ok(fetched)
}
crate::KbPumpFunDecodedEvent::BuyTrade(event) => {
self.persist_pump_fun_trade_event(
transaction,
event,
"pump_fun.buy",
"signal.dex.pump_fun.buy",
"dex.pump_fun.buy",
)
.await
}
crate::KbPumpFunDecodedEvent::SellTrade(event) => {
self.persist_pump_fun_trade_event(
transaction,
event,
"pump_fun.sell",
"signal.dex.pump_fun.sell",
"dex.pump_fun.sell",
)
.await
}
}
}
async fn persist_pump_fun_trade_event(
&self,
transaction: &crate::KbChainTransactionDto,
event: &crate::KbPumpFunTradeDecoded,
event_kind: &str,
signal_kind: &str,
observation_kind: &str,
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded pump.fun trade payload: {}",
error
)));
}
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
event.transaction_id,
Some(event.instruction_id),
event_kind,
)
.await;
let existing_option = match existing_result {
Ok(existing_option) => existing_option,
Err(error) => return Err(error),
};
let already_present = existing_option.is_some();
let dto = crate::KbDexDecodedEventDto::new(
event.transaction_id,
Some(event.instruction_id),
"pump_fun".to_string(),
event.program_id.clone(),
event_kind.to_string(),
event.bonding_curve.clone(),
None,
event.mint.clone(),
Some(crate::WSOL_MINT_ID.to_string()),
event.associated_bonding_curve.clone(),
payload_json,
);
let upsert_result = crate::upsert_dex_decoded_event(self.database.as_ref(), &dto).await;
if let Err(error) = upsert_result {
return Err(error);
}
let fetched_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
event.transaction_id,
Some(event.instruction_id),
event_kind,
)
.await;
let fetched_option = match fetched_result {
Ok(fetched_option) => fetched_option,
Err(error) => return Err(error),
};
let fetched = match fetched_option {
Some(fetched) => fetched,
None => {
return Err(crate::KbError::InvalidState(
"decoded pump.fun trade event disappeared after upsert".to_string(),
));
}
};
if !already_present {
let payload_value = event.payload_json.clone();
let observation_result = self
.persistence
.record_observation(&crate::KbDetectionObservationInput::new(
observation_kind.to_string(),
crate::KbObservationSourceKind::HttpRpc,
transaction.source_endpoint_name.clone(),
transaction.signature.clone(),
transaction.slot,
payload_value.clone(),
))
.await;
let observation_id = match observation_result {
Ok(observation_id) => observation_id,
Err(error) => return Err(error),
};
let signal_result = self
.persistence
.record_signal(&crate::KbDetectionSignalInput::new(
signal_kind.to_string(),
crate::KbAnalysisSignalSeverity::Low,
transaction.signature.clone(),
Some(observation_id),
None,
payload_value,
))
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
Ok(fetched)
}
async fn persist_pump_swap_event(
&self,
transaction: &crate::KbChainTransactionDto,

View File

@@ -105,6 +105,30 @@ impl KbDexDetectService {
};
detection_results.push(detect_result);
}
if decoded_event.protocol_name == "pump_fun"
&& decoded_event.event_kind == "pump_fun.buy"
{
let detect_result = self
.detect_pump_fun_trade(&transaction, decoded_event)
.await;
let detect_result = match detect_result {
Ok(detect_result) => detect_result,
Err(error) => return Err(error),
};
detection_results.push(detect_result);
}
if decoded_event.protocol_name == "pump_fun"
&& decoded_event.event_kind == "pump_fun.sell"
{
let detect_result = self
.detect_pump_fun_trade(&transaction, decoded_event)
.await;
let detect_result = match detect_result {
Ok(detect_result) => detect_result,
Err(error) => return Err(error),
};
detection_results.push(detect_result);
}
if decoded_event.protocol_name == "pump_swap"
&& decoded_event.event_kind == "pump_swap.buy"
{
@@ -773,6 +797,243 @@ impl KbDexDetectService {
})
}
async fn detect_pump_fun_trade(
&self,
transaction: &crate::KbChainTransactionDto,
decoded_event: &crate::KbDexDecodedEventDto,
) -> Result<crate::KbDexPoolDetectionResult, crate::KbError> {
let decoded_event_id_option = decoded_event.id;
let decoded_event_id = match decoded_event_id_option {
Some(decoded_event_id) => decoded_event_id,
None => {
return Err(crate::KbError::InvalidState(
"decoded dex event has no internal id".to_string(),
));
}
};
let dex_id_result = self.ensure_pump_fun_dex().await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let pool_address_option = decoded_event.pool_account.clone();
let pool_address = match pool_address_option {
Some(pool_address) => pool_address,
None => {
return Err(crate::KbError::InvalidState(format!(
"decoded event '{}' has no pool_account",
decoded_event_id
)));
}
};
let token_a_mint_option = decoded_event.token_a_mint.clone();
let token_a_mint = match token_a_mint_option {
Some(token_a_mint) => token_a_mint,
None => {
return Err(crate::KbError::InvalidState(format!(
"decoded event '{}' has no token_a_mint",
decoded_event_id
)));
}
};
let token_b_mint_option = decoded_event.token_b_mint.clone();
let token_b_mint = match token_b_mint_option {
Some(token_b_mint) => token_b_mint,
None => {
return Err(crate::KbError::InvalidState(format!(
"decoded event '{}' has no token_b_mint",
decoded_event_id
)));
}
};
let base_is_token_a =
kb_choose_base_quote_order(token_a_mint.as_str(), token_b_mint.as_str());
let base_mint = if base_is_token_a {
token_a_mint.clone()
} else {
token_b_mint.clone()
};
let quote_mint = if base_is_token_a {
token_b_mint.clone()
} else {
token_a_mint.clone()
};
let base_token_id_result = self.ensure_token(base_mint.as_str()).await;
let base_token_id = match base_token_id_result {
Ok(base_token_id) => base_token_id,
Err(error) => return Err(error),
};
let payload_value_result = kb_parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
let vault_addresses = kb_extract_pump_fun_vault_addresses(&payload_value);
let token_a_vault_address = vault_addresses.0;
let token_b_vault_address = vault_addresses.1;
let base_vault_address = if base_is_token_a {
token_a_vault_address.clone()
} else {
token_b_vault_address.clone()
};
let quote_vault_address = if base_is_token_a {
token_b_vault_address.clone()
} else {
token_a_vault_address.clone()
};
let quote_token_id_result = self.ensure_token(quote_mint.as_str()).await;
let quote_token_id = match quote_token_id_result {
Ok(quote_token_id) => quote_token_id,
Err(error) => return Err(error),
};
let existing_pool_result =
crate::get_pool_by_address(self.database.as_ref(), pool_address.as_str()).await;
let existing_pool_option = match existing_pool_result {
Ok(existing_pool_option) => existing_pool_option,
Err(error) => return Err(error),
};
let created_pool = existing_pool_option.is_none();
let pool_id = match existing_pool_option {
Some(pool) => {
let pool_id_option = pool.id;
match pool_id_option {
Some(pool_id) => pool_id,
None => {
return Err(crate::KbError::InvalidState(format!(
"pool '{}' has no internal id",
pool.address
)));
}
}
}
None => {
let pool_dto = crate::KbPoolDto::new(
dex_id,
pool_address.clone(),
crate::KbPoolKind::BondingCurve,
crate::KbPoolStatus::Active,
);
let upsert_result = crate::upsert_pool(self.database.as_ref(), &pool_dto).await;
match upsert_result {
Ok(pool_id) => pool_id,
Err(error) => return Err(error),
}
}
};
let existing_pair_result =
crate::get_pair_by_pool_id(self.database.as_ref(), pool_id).await;
let existing_pair_option = match existing_pair_result {
Ok(existing_pair_option) => existing_pair_option,
Err(error) => return Err(error),
};
let created_pair = existing_pair_option.is_none();
let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str());
let pair_dto =
crate::KbPairDto::new(dex_id, pool_id, base_token_id, quote_token_id, pair_symbol);
let pair_id_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await;
let pair_id = match pair_id_result {
Ok(pair_id) => pair_id,
Err(error) => return Err(error),
};
let upsert_base_pool_token_result = crate::upsert_pool_token(
self.database.as_ref(),
&crate::KbPoolTokenDto::new(
pool_id,
base_token_id,
crate::KbPoolTokenRole::Base,
base_vault_address,
Some(0),
),
)
.await;
if let Err(error) = upsert_base_pool_token_result {
return Err(error);
}
let upsert_quote_pool_token_result = crate::upsert_pool_token(
self.database.as_ref(),
&crate::KbPoolTokenDto::new(
pool_id,
quote_token_id,
crate::KbPoolTokenRole::Quote,
quote_vault_address,
Some(1),
),
)
.await;
if let Err(error) = upsert_quote_pool_token_result {
return Err(error);
}
let existing_listing_result =
crate::get_pool_listing_by_pool_id(self.database.as_ref(), pool_id).await;
let existing_listing_option = match existing_listing_result {
Ok(existing_listing_option) => existing_listing_option,
Err(error) => return Err(error),
};
let created_listing = existing_listing_option.is_none();
let pool_listing_id = match existing_listing_option {
Some(pool_listing) => pool_listing.id,
None => {
let listing_id_result = self
.upsert_pool_listing_from_decoded_event(dex_id, pool_id, pair_id, transaction)
.await;
match listing_id_result {
Ok(listing_id) => Some(listing_id),
Err(error) => return Err(error),
}
}
};
if created_pool {
let signal_result = self
.record_detection_signal(
transaction,
"signal.dex.pump_fun.new_pool",
crate::KbAnalysisSignalSeverity::Low,
payload_value.clone(),
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
if created_pair {
let signal_result = self
.record_detection_signal(
transaction,
"signal.dex.pump_fun.new_pair",
crate::KbAnalysisSignalSeverity::Low,
payload_value.clone(),
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
if created_listing {
let signal_result = self
.record_detection_signal(
transaction,
"signal.dex.pump_fun.first_listing_seen",
crate::KbAnalysisSignalSeverity::Low,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
Ok(crate::KbDexPoolDetectionResult {
decoded_event_id,
dex_id,
pool_id,
pair_id,
pool_listing_id,
created_pool,
created_pair,
created_listing,
})
}
async fn detect_pump_swap_trade(
&self,
transaction: &crate::KbChainTransactionDto,
@@ -905,13 +1166,8 @@ impl KbDexDetectService {
};
let created_pair = existing_pair_option.is_none();
let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str());
let pair_dto = crate::KbPairDto::new(
dex_id,
pool_id,
base_token_id,
quote_token_id,
pair_symbol,
);
let pair_dto =
crate::KbPairDto::new(dex_id, pool_id, base_token_id, quote_token_id, pair_symbol);
let pair_id_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await;
let pair_id = match pair_id_result {
Ok(pair_id) => pair_id,
@@ -2862,6 +3118,27 @@ fn kb_extract_string_from_array_index(
Some(text.to_string())
}
fn kb_extract_pump_fun_vault_addresses(
payload_value: &serde_json::Value,
) -> (
std::option::Option<std::string::String>,
std::option::Option<std::string::String>,
) {
let accounts_option = payload_value.get("accounts");
let accounts = match accounts_option {
Some(accounts) => accounts,
None => return (None, None),
};
let accounts_array_option = accounts.as_array();
let accounts_array = match accounts_array_option {
Some(accounts_array) => accounts_array,
None => return (None, None),
};
let token_a_vault_address = kb_extract_string_from_array_index(accounts_array, 4);
let token_b_native_address = kb_extract_string_from_array_index(accounts_array, 3);
(token_a_vault_address, token_b_native_address)
}
fn kb_extract_pump_swap_vault_addresses(
payload_value: &serde_json::Value,
) -> (

View File

@@ -49,82 +49,6 @@ pub use config::KbSolanaConfig;
pub use config::KbSqliteDatabaseConfig;
pub use config::KbWsEndpointConfig;
pub use constants::*;
pub use db::KbAnalysisSignalDto;
pub use db::KbAnalysisSignalEntity;
pub use db::KbAnalysisSignalSeverity;
pub use db::KbChainInstructionDto;
pub use db::KbChainInstructionEntity;
pub use db::KbChainSlotDto;
pub use db::KbChainSlotEntity;
pub use db::KbChainTransactionDto;
pub use db::KbChainTransactionEntity;
pub use db::KbDatabase;
pub use db::KbDatabaseBackend;
pub use db::KbDatabaseConnection;
pub use db::KbDbMetadataDto;
pub use db::KbDbMetadataEntity;
pub use db::KbDbRuntimeEventDto;
pub use db::KbDbRuntimeEventEntity;
pub use db::KbDbRuntimeEventLevel;
pub use db::KbDexDecodedEventDto;
pub use db::KbDexDecodedEventEntity;
pub use db::KbDexDto;
pub use db::KbDexEntity;
pub use db::KbKnownHttpEndpointDto;
pub use db::KbKnownHttpEndpointEntity;
pub use db::KbKnownWsEndpointDto;
pub use db::KbKnownWsEndpointEntity;
pub use db::KbLaunchAttributionDto;
pub use db::KbLaunchAttributionEntity;
pub use db::KbLaunchSurfaceDto;
pub use db::KbLaunchSurfaceEntity;
pub use db::KbLaunchSurfaceKeyDto;
pub use db::KbLaunchSurfaceKeyEntity;
pub use db::KbLiquidityEventDto;
pub use db::KbLiquidityEventEntity;
pub use db::KbLiquidityEventKind;
pub use db::KbObservationSourceKind;
pub use db::KbObservedTokenDto;
pub use db::KbObservedTokenEntity;
pub use db::KbObservedTokenStatus;
pub use db::KbOnchainObservationDto;
pub use db::KbOnchainObservationEntity;
pub use db::KbPairAnalyticSignalDto;
pub use db::KbPairAnalyticSignalEntity;
pub use db::KbPairCandleDto;
pub use db::KbPairCandleEntity;
pub use db::KbPairDto;
pub use db::KbPairEntity;
pub use db::KbPairMetricDto;
pub use db::KbPairMetricEntity;
pub use db::KbPoolDto;
pub use db::KbPoolEntity;
pub use db::KbPoolKind;
pub use db::KbPoolListingDto;
pub use db::KbPoolListingEntity;
pub use db::KbPoolOriginDto;
pub use db::KbPoolOriginEntity;
pub use db::KbPoolStatus;
pub use db::KbPoolTokenDto;
pub use db::KbPoolTokenEntity;
pub use db::KbPoolTokenRole;
pub use db::KbSwapDto;
pub use db::KbSwapEntity;
pub use db::KbSwapTradeSide;
pub use db::KbTokenBurnEventDto;
pub use db::KbTokenBurnEventEntity;
pub use db::KbTokenDto;
pub use db::KbTokenEntity;
pub use db::KbTokenMintEventDto;
pub use db::KbTokenMintEventEntity;
pub use db::KbTradeEventDto;
pub use db::KbTradeEventEntity;
pub use db::KbWalletDto;
pub use db::KbWalletEntity;
pub use db::KbWalletHoldingDto;
pub use db::KbWalletHoldingEntity;
pub use db::KbWalletParticipationDto;
pub use db::KbWalletParticipationEntity;
pub use db::delete_chain_instructions_by_transaction_id;
pub use db::get_chain_slot;
pub use db::get_chain_transaction_by_signature;
@@ -215,6 +139,82 @@ pub use db::upsert_trade_event;
pub use db::upsert_wallet;
pub use db::upsert_wallet_holding;
pub use db::upsert_wallet_participation;
pub use db::KbAnalysisSignalDto;
pub use db::KbAnalysisSignalEntity;
pub use db::KbAnalysisSignalSeverity;
pub use db::KbChainInstructionDto;
pub use db::KbChainInstructionEntity;
pub use db::KbChainSlotDto;
pub use db::KbChainSlotEntity;
pub use db::KbChainTransactionDto;
pub use db::KbChainTransactionEntity;
pub use db::KbDatabase;
pub use db::KbDatabaseBackend;
pub use db::KbDatabaseConnection;
pub use db::KbDbMetadataDto;
pub use db::KbDbMetadataEntity;
pub use db::KbDbRuntimeEventDto;
pub use db::KbDbRuntimeEventEntity;
pub use db::KbDbRuntimeEventLevel;
pub use db::KbDexDecodedEventDto;
pub use db::KbDexDecodedEventEntity;
pub use db::KbDexDto;
pub use db::KbDexEntity;
pub use db::KbKnownHttpEndpointDto;
pub use db::KbKnownHttpEndpointEntity;
pub use db::KbKnownWsEndpointDto;
pub use db::KbKnownWsEndpointEntity;
pub use db::KbLaunchAttributionDto;
pub use db::KbLaunchAttributionEntity;
pub use db::KbLaunchSurfaceDto;
pub use db::KbLaunchSurfaceEntity;
pub use db::KbLaunchSurfaceKeyDto;
pub use db::KbLaunchSurfaceKeyEntity;
pub use db::KbLiquidityEventDto;
pub use db::KbLiquidityEventEntity;
pub use db::KbLiquidityEventKind;
pub use db::KbObservationSourceKind;
pub use db::KbObservedTokenDto;
pub use db::KbObservedTokenEntity;
pub use db::KbObservedTokenStatus;
pub use db::KbOnchainObservationDto;
pub use db::KbOnchainObservationEntity;
pub use db::KbPairAnalyticSignalDto;
pub use db::KbPairAnalyticSignalEntity;
pub use db::KbPairCandleDto;
pub use db::KbPairCandleEntity;
pub use db::KbPairDto;
pub use db::KbPairEntity;
pub use db::KbPairMetricDto;
pub use db::KbPairMetricEntity;
pub use db::KbPoolDto;
pub use db::KbPoolEntity;
pub use db::KbPoolKind;
pub use db::KbPoolListingDto;
pub use db::KbPoolListingEntity;
pub use db::KbPoolOriginDto;
pub use db::KbPoolOriginEntity;
pub use db::KbPoolStatus;
pub use db::KbPoolTokenDto;
pub use db::KbPoolTokenEntity;
pub use db::KbPoolTokenRole;
pub use db::KbSwapDto;
pub use db::KbSwapEntity;
pub use db::KbSwapTradeSide;
pub use db::KbTokenBurnEventDto;
pub use db::KbTokenBurnEventEntity;
pub use db::KbTokenDto;
pub use db::KbTokenEntity;
pub use db::KbTokenMintEventDto;
pub use db::KbTokenMintEventEntity;
pub use db::KbTradeEventDto;
pub use db::KbTradeEventEntity;
pub use db::KbWalletDto;
pub use db::KbWalletEntity;
pub use db::KbWalletHoldingDto;
pub use db::KbWalletHoldingEntity;
pub use db::KbWalletParticipationDto;
pub use db::KbWalletParticipationEntity;
pub use detect::KbDetectionObservationInput;
pub use detect::KbDetectionPersistenceService;
pub use detect::KbDetectionPoolCandidateInput;
@@ -227,15 +227,6 @@ pub use detect::KbSolanaWsDetectionService;
pub use detect::KbWsDetectionNotificationEnvelope;
pub use detect::KbWsDetectionRelay;
pub use detect::KbWsDetectionRelayStats;
pub use dex::KB_DEXLAB_PROGRAM_ID;
pub use dex::KB_FLUXBEAM_PROGRAM_ID;
pub use dex::KB_METEORA_DAMM_V1_PROGRAM_ID;
pub use dex::KB_METEORA_DAMM_V2_PROGRAM_ID;
pub use dex::KB_METEORA_DBC_PROGRAM_ID;
pub use dex::KB_ORCA_WHIRLPOOLS_PROGRAM_ID;
pub use dex::KB_PUMP_FUN_PROGRAM_ID;
pub use dex::KB_PUMP_SWAP_PROGRAM_ID;
pub use dex::KB_RAYDIUM_AMM_V4_PROGRAM_ID;
pub use dex::KbDexlabCreatePoolDecoded;
pub use dex::KbDexlabDecodedEvent;
pub use dex::KbDexlabDecoder;
@@ -263,16 +254,28 @@ pub use dex::KbOrcaWhirlpoolsSwapDecoded;
pub use dex::KbPumpFunCreateV2TokenDecoded;
pub use dex::KbPumpFunDecodedEvent;
pub use dex::KbPumpFunDecoder;
pub use dex::KbPumpFunTradeDecoded;
pub use dex::KbPumpSwapDecodedEvent;
pub use dex::KbPumpSwapDecoder;
pub use dex::KbPumpSwapTradeDecoded;
pub use dex::KbRaydiumAmmV4DecodedEvent;
pub use dex::KbRaydiumAmmV4Decoder;
pub use dex::KbRaydiumAmmV4Initialize2PoolDecoded;
pub use dex::KB_DEXLAB_PROGRAM_ID;
pub use dex::KB_FLUXBEAM_PROGRAM_ID;
pub use dex::KB_METEORA_DAMM_V1_PROGRAM_ID;
pub use dex::KB_METEORA_DAMM_V2_PROGRAM_ID;
pub use dex::KB_METEORA_DBC_PROGRAM_ID;
pub use dex::KB_ORCA_WHIRLPOOLS_PROGRAM_ID;
pub use dex::KB_PUMP_FUN_PROGRAM_ID;
pub use dex::KB_PUMP_SWAP_PROGRAM_ID;
pub use dex::KB_RAYDIUM_AMM_V4_PROGRAM_ID;
pub use dex_decode::KbDexDecodeService;
pub use dex_detect::KbDexDetectService;
pub use dex_detect::KbDexPoolDetectionResult;
pub use error::KbError;
pub use http_client::parse_kb_json_rpc_http_response_text;
pub use http_client::parse_kb_json_rpc_http_response_value;
pub use http_client::HttpClient;
pub use http_client::KbHttpEndpointStatus;
pub use http_client::KbHttpMethodClass;
@@ -281,10 +284,11 @@ pub use http_client::KbJsonRpcHttpErrorResponse;
pub use http_client::KbJsonRpcHttpRequest;
pub use http_client::KbJsonRpcHttpResponse;
pub use http_client::KbJsonRpcHttpSuccessResponse;
pub use http_client::parse_kb_json_rpc_http_response_text;
pub use http_client::parse_kb_json_rpc_http_response_value;
pub use http_pool::HttpEndpointPool;
pub use http_pool::KbHttpPoolClientSnapshot;
pub use json_rpc_ws::kb_is_probable_json_rpc_object_text;
pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_text;
pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_value;
pub use json_rpc_ws::KbJsonRpcWsErrorObject;
pub use json_rpc_ws::KbJsonRpcWsErrorResponse;
pub use json_rpc_ws::KbJsonRpcWsIncomingMessage;
@@ -292,9 +296,6 @@ pub use json_rpc_ws::KbJsonRpcWsNotification;
pub use json_rpc_ws::KbJsonRpcWsNotificationParams;
pub use json_rpc_ws::KbJsonRpcWsRequest;
pub use json_rpc_ws::KbJsonRpcWsSuccessResponse;
pub use json_rpc_ws::kb_is_probable_json_rpc_object_text;
pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_text;
pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_value;
pub use launch_origin::KbLaunchAttributionResult;
pub use launch_origin::KbLaunchOriginService;
pub use pair_analytic_signal::KbPairAnalyticSignalResult;
@@ -304,14 +305,14 @@ pub use pair_candle_aggregation::KbPairCandleAggregationService;
pub use pair_candle_query::KbPairCandleQueryService;
pub use pool_origin::KbPoolOriginResult;
pub use pool_origin::KbPoolOriginService;
pub use solana_pubsub_ws::KbSolanaWsTypedNotification;
pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification;
pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event;
pub use solana_pubsub_ws::KbSolanaWsTypedNotification;
pub use token_backfill::KbPoolBackfillResult;
pub use token_backfill::KbTokenBackfillResult;
pub use token_backfill::KbTokenBackfillService;
pub use tracing::KbTracingGuard;
pub use tracing::init_tracing;
pub use tracing::KbTracingGuard;
pub use trade_aggregation::KbTradeAggregationResult;
pub use trade_aggregation::KbTradeAggregationService;
pub use tx_model::KbTransactionModelService;

View File

@@ -174,7 +174,7 @@ mod tests {
};
std::sync::Arc::new(database)
}
async fn seed_fluxbeam_swap_transaction(
database: std::sync::Arc<crate::KbDatabase>,
signature: &str,
@@ -346,7 +346,7 @@ mod tests {
seed_fluxbeam_swap_transaction(
database.clone(),
"sig-pair-candle-fallback-2",
1_700_020_020,
1_700_020_010,
"1000",
"3000",
)
@@ -389,5 +389,4 @@ mod tests {
assert_eq!(candles[0].close_price_quote_per_base, 3.0);
assert_eq!(candles[0].trade_count, 2);
}
}

View File

@@ -203,6 +203,31 @@ impl KbTradeAggregationService {
price_quote_per_base = inferred.2;
}
}
if decoded_event.event_kind.starts_with("pump_fun.")
&& (base_amount_raw.is_none()
|| quote_amount_raw.is_none()
|| price_quote_per_base.is_none())
{
let inferred_result = kb_extract_pump_fun_amounts_from_transaction(
transaction.transaction_json.as_str(),
transaction.meta_json.as_deref(),
base_vault_address.as_deref(),
quote_vault_address.as_deref(),
);
let inferred = match inferred_result {
Ok(inferred) => inferred,
Err(error) => return Err(error),
};
if base_amount_raw.is_none() {
base_amount_raw = inferred.0;
}
if quote_amount_raw.is_none() {
quote_amount_raw = inferred.1;
}
if price_quote_per_base.is_none() {
price_quote_per_base = inferred.2;
}
}
if price_quote_per_base.is_none() {
price_quote_per_base = kb_compute_price_quote_per_base_with_decimals(
transaction.meta_json.as_deref(),
@@ -211,6 +236,12 @@ impl KbTradeAggregationService {
quote_vault_address.as_deref(),
);
}
if price_quote_per_base.is_none() {
price_quote_per_base = kb_compute_price_quote_per_base_from_raw_amounts(
base_amount_raw.as_deref(),
quote_amount_raw.as_deref(),
);
}
let slot_i64 = kb_convert_slot_to_i64(transaction.slot);
let created_trade_event = existing_trade_option.is_none();
let trade_event_dto = crate::KbTradeEventDto::new(
@@ -637,6 +668,146 @@ fn kb_extract_pump_swap_amounts_from_transaction(
Ok((base_amount_raw, quote_amount_raw, price_quote_per_base))
}
fn kb_extract_pump_fun_amounts_from_transaction(
transaction_json: &str,
meta_json: std::option::Option<&str>,
base_vault_address: std::option::Option<&str>,
quote_native_address: std::option::Option<&str>,
) -> Result<
(
std::option::Option<std::string::String>,
std::option::Option<std::string::String>,
std::option::Option<f64>,
),
crate::KbError,
> {
let meta_json = match meta_json {
Some(meta_json) => meta_json,
None => return Ok((None, None, None)),
};
let transaction_value_result = serde_json::from_str::<serde_json::Value>(transaction_json);
let transaction_value = match transaction_value_result {
Ok(transaction_value) => transaction_value,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot parse transaction_json for pump_fun amount extraction: {}",
error
)));
}
};
let meta_value_result = serde_json::from_str::<serde_json::Value>(meta_json);
let meta_value = match meta_value_result {
Ok(meta_value) => meta_value,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot parse meta_json for pump_fun amount extraction: {}",
error
)));
}
};
let account_keys_result = kb_extract_transaction_account_keys(&transaction_value);
let account_keys = match account_keys_result {
Ok(account_keys) => account_keys,
Err(error) => return Err(error),
};
let pre_balances_result =
kb_extract_token_balance_map(&meta_value, &account_keys, "preTokenBalances");
let pre_balances = match pre_balances_result {
Ok(pre_balances) => pre_balances,
Err(error) => return Err(error),
};
let post_balances_result =
kb_extract_token_balance_map(&meta_value, &account_keys, "postTokenBalances");
let post_balances = match post_balances_result {
Ok(post_balances) => post_balances,
Err(error) => return Err(error),
};
let mut base_amount_raw = None;
let mut quote_amount_raw = None;
let mut price_quote_per_base = None;
let mut base_delta_ui = None;
if let Some(base_vault_address) = base_vault_address {
let base_pre = pre_balances.get(base_vault_address);
let base_post = post_balances.get(base_vault_address);
let base_pre_raw = base_pre.map(|value| value.0.clone());
let base_post_raw = base_post.map(|value| value.0.clone());
base_amount_raw = kb_compute_amount_delta_abs(base_pre_raw, base_post_raw);
let base_pre_ui = base_pre.and_then(|value| value.1);
let base_post_ui = base_post.and_then(|value| value.1);
base_delta_ui = kb_compute_ui_delta_abs(base_pre_ui, base_post_ui);
}
if let Some(quote_native_address) = quote_native_address {
let quote_delta_result = kb_extract_native_balance_delta_by_address(
&meta_value,
&account_keys,
quote_native_address,
);
let quote_delta = match quote_delta_result {
Ok(quote_delta) => quote_delta,
Err(error) => return Err(error),
};
if let Some(quote_delta_lamports) = quote_delta {
quote_amount_raw = Some(quote_delta_lamports.to_string());
let quote_delta_ui = quote_delta_lamports as f64 / 1_000_000_000.0;
if let Some(base_delta_ui) = base_delta_ui {
if base_delta_ui > 0.0 {
price_quote_per_base = Some(quote_delta_ui / base_delta_ui);
}
}
}
}
Ok((base_amount_raw, quote_amount_raw, price_quote_per_base))
}
fn kb_extract_native_balance_delta_by_address(
meta_value: &serde_json::Value,
account_keys: &[std::string::String],
address: &str,
) -> Result<std::option::Option<u64>, crate::KbError> {
let mut account_index = None;
for (index, account_key) in account_keys.iter().enumerate() {
if account_key.as_str() == address {
account_index = Some(index);
break;
}
}
let account_index = match account_index {
Some(account_index) => account_index,
None => return Ok(None),
};
let pre_balances_option = meta_value
.get("preBalances")
.and_then(|value| value.as_array());
let post_balances_option = meta_value
.get("postBalances")
.and_then(|value| value.as_array());
let pre_balances = match pre_balances_option {
Some(pre_balances) => pre_balances,
None => return Ok(None),
};
let post_balances = match post_balances_option {
Some(post_balances) => post_balances,
None => return Ok(None),
};
if account_index >= pre_balances.len() || account_index >= post_balances.len() {
return Ok(None);
}
let pre_balance_option = pre_balances[account_index].as_u64();
let post_balance_option = post_balances[account_index].as_u64();
let pre_balance = match pre_balance_option {
Some(pre_balance) => pre_balance,
None => return Ok(None),
};
let post_balance = match post_balance_option {
Some(post_balance) => post_balance,
None => return Ok(None),
};
if post_balance >= pre_balance {
return Ok(Some(post_balance - pre_balance));
}
Ok(Some(pre_balance - post_balance))
}
fn kb_extract_transaction_account_keys(
transaction_value: &serde_json::Value,
) -> Result<std::vec::Vec<std::string::String>, crate::KbError> {
@@ -792,6 +963,37 @@ fn kb_compute_ui_delta_abs(
Some(delta)
}
fn kb_compute_price_quote_per_base_from_raw_amounts(
base_amount_raw: std::option::Option<&str>,
quote_amount_raw: std::option::Option<&str>,
) -> std::option::Option<f64> {
let base_amount_raw = match base_amount_raw {
Some(base_amount_raw) => base_amount_raw.trim(),
None => return None,
};
let quote_amount_raw = match quote_amount_raw {
Some(quote_amount_raw) => quote_amount_raw.trim(),
None => return None,
};
if base_amount_raw.is_empty() || quote_amount_raw.is_empty() {
return None;
}
let base_amount_result = base_amount_raw.parse::<f64>();
let base_amount = match base_amount_result {
Ok(base_amount) => base_amount,
Err(_) => return None,
};
let quote_amount_result = quote_amount_raw.parse::<f64>();
let quote_amount = match quote_amount_result {
Ok(quote_amount) => quote_amount,
Err(_) => return None,
};
if base_amount <= 0.0 {
return None;
}
Some(quote_amount / base_amount)
}
fn kb_compute_price_quote_per_base_with_decimals(
meta_json: std::option::Option<&str>,
transaction_json: &str,