This commit is contained in:
2026-06-14 14:25:09 +02:00
parent 38f42da970
commit 3b908b318e
100 changed files with 5873 additions and 225 deletions

View File

@@ -66,6 +66,7 @@ pub use pump_fun::PumpFunDecodedEvent;
pub use pump_fun::PumpFunDecoder;
pub use pump_fun::PumpFunTradeDecoded;
pub use pump_swap::PumpSwapDecodedEvent;
pub use pump_swap::PumpSwapInstructionDecoded;
pub use pump_swap::PumpSwapDecoder;
pub use pump_swap::PumpSwapTradeDecoded;
pub use raydium_amm_v4::RaydiumAmmV4DecodedEvent;

File diff suppressed because it is too large Load Diff

View File

@@ -82,7 +82,8 @@ impl DexDecodeService {
}
let append_result = append_persisted_events_result(
&mut persisted,
self.decode_and_persist_raydium_stable_swap_events(&transaction, &instructions).await,
self.decode_and_persist_raydium_stable_swap_events(&transaction, &instructions)
.await,
);
if let Err(error) = append_result {
return Err(error);
@@ -1679,6 +1680,17 @@ impl DexDecodeService {
)
.await;
},
crate::PumpSwapDecodedEvent::BuyExactQuoteInTrade(event) => {
return self
.persist_pump_swap_trade_event(
transaction,
event,
"pump_swap.buy_exact_quote_in",
"signal.dex.pump_swap.buy_exact_quote_in",
"dex.pump_swap.buy_exact_quote_in",
)
.await;
},
crate::PumpSwapDecodedEvent::SellTrade(event) => {
return self
.persist_pump_swap_trade_event(
@@ -1690,9 +1702,35 @@ impl DexDecodeService {
)
.await;
},
crate::PumpSwapDecodedEvent::Instruction(event) => {
return self.persist_pump_swap_instruction_event(transaction, event).await;
},
}
}
async fn persist_pump_swap_instruction_event(
&self,
transaction: &crate::ChainTransactionDto,
event: &crate::PumpSwapInstructionDecoded,
) -> Result<crate::DexDecodedEventDto, crate::Error> {
return self
.materialize_named_dex_event(
transaction,
event.transaction_id,
event.instruction_id,
"pump_swap",
event.program_id.clone(),
event.event_kind.as_str(),
event.pool_account.clone(),
None,
event.token_a_mint.clone(),
event.token_b_mint.clone(),
event.lp_mint.clone(),
event.payload_json.clone(),
)
.await;
}
async fn persist_pump_swap_trade_event(
&self,
transaction: &crate::ChainTransactionDto,
@@ -1781,18 +1819,16 @@ impl DexDecodeService {
transaction: &crate::ChainTransactionDto,
instructions: &[crate::ChainInstructionDto],
) -> Result<std::vec::Vec<crate::DexDecodedEventDto>, crate::Error> {
let decoded_result = self
.raydium_stable_swap_decoder
.decode_transaction(transaction, instructions);
let decoded_result =
self.raydium_stable_swap_decoder.decode_transaction(transaction, instructions);
let decoded_events = match decoded_result {
Ok(decoded_events) => decoded_events,
Err(error) => return Err(error),
};
let mut persisted = std::vec::Vec::new();
for decoded_event in &decoded_events {
let persist_result = self
.persist_raydium_stable_swap_event(transaction, decoded_event)
.await;
let persist_result =
self.persist_raydium_stable_swap_event(transaction, decoded_event).await;
let persisted_event = match persist_result {
Ok(persisted_event) => persisted_event,
Err(error) => return Err(error),
@@ -3686,7 +3722,10 @@ fn insert_raydium_mapped_amounts(
);
}
if let Some(open_time) = read_u64_le_from_bytes(data, 2) {
object.insert("openTime".to_string(), serde_json::Value::String(open_time.to_string()));
object.insert(
"openTime".to_string(),
serde_json::Value::String(open_time.to_string()),
);
}
},
RaydiumMappedNonTradeAmountLayout::AmmV4Initialize2 => {
@@ -3697,7 +3736,10 @@ fn insert_raydium_mapped_amounts(
);
}
if let Some(open_time) = read_u64_le_from_bytes(data, 2) {
object.insert("openTime".to_string(), serde_json::Value::String(open_time.to_string()));
object.insert(
"openTime".to_string(),
serde_json::Value::String(open_time.to_string()),
);
}
if let Some(init_pc_amount) = read_u64_le_from_bytes(data, 10) {
object.insert(
@@ -3762,7 +3804,10 @@ fn insert_raydium_mapped_amounts(
);
}
if let Some(base_side) = read_u64_le_from_bytes(data, 17) {
object.insert("baseSide".to_string(), serde_json::Value::String(base_side.to_string()));
object.insert(
"baseSide".to_string(),
serde_json::Value::String(base_side.to_string()),
);
}
if let Some(other_amount_min) = read_u64_le_from_bytes(data, 25) {
object.insert(
@@ -3773,8 +3818,14 @@ fn insert_raydium_mapped_amounts(
},
RaydiumMappedNonTradeAmountLayout::AmmV4Withdraw => {
if let Some(lp_amount) = read_u64_le_from_bytes(data, 1) {
object.insert("lpAmountRaw".to_string(), serde_json::Value::String(lp_amount.to_string()));
object.insert("liquidity".to_string(), serde_json::Value::String(lp_amount.to_string()));
object.insert(
"lpAmountRaw".to_string(),
serde_json::Value::String(lp_amount.to_string()),
);
object.insert(
"liquidity".to_string(),
serde_json::Value::String(lp_amount.to_string()),
);
}
if let Some(min_coin_amount) = read_u64_le_from_bytes(data, 9) {
object.insert(
@@ -3797,7 +3848,10 @@ fn insert_raydium_mapped_amounts(
);
}
if let Some(value) = read_u64_le_from_bytes(data, 2) {
object.insert("configValue".to_string(), serde_json::Value::String(value.to_string()));
object.insert(
"configValue".to_string(),
serde_json::Value::String(value.to_string()),
);
}
if let Some(last_order_denominator) = read_u64_le_from_bytes(data, 10) {
object.insert(
@@ -3808,7 +3862,8 @@ fn insert_raydium_mapped_amounts(
},
RaydiumMappedNonTradeAmountLayout::AmmV4WithdrawSrm => {
if let Some(amount) = read_u64_le_from_bytes(data, 1) {
object.insert("amountRaw".to_string(), serde_json::Value::String(amount.to_string()));
object
.insert("amountRaw".to_string(), serde_json::Value::String(amount.to_string()));
}
},
RaydiumMappedNonTradeAmountLayout::AmmV4PreInitialize => {
@@ -3833,10 +3888,16 @@ fn insert_raydium_mapped_amounts(
);
}
if let Some(amount_in) = read_u64_le_from_bytes(data, 2) {
object.insert("amountIn".to_string(), serde_json::Value::String(amount_in.to_string()));
object.insert(
"amountIn".to_string(),
serde_json::Value::String(amount_in.to_string()),
);
}
if let Some(amount_out) = read_u64_le_from_bytes(data, 10) {
object.insert("amountOutOrMinimumAmountOut".to_string(), serde_json::Value::String(amount_out.to_string()));
object.insert(
"amountOutOrMinimumAmountOut".to_string(),
serde_json::Value::String(amount_out.to_string()),
);
}
},
RaydiumMappedNonTradeAmountLayout::AmmV4AdminCancelOrders => {
@@ -4078,7 +4139,8 @@ fn build_meteora_instruction_audit_payload(
};
let data_base58 = parse_instruction_data_base58(instruction.data_json.as_deref());
let data_bytes = instruction_data_bytes_from_base58(data_base58.as_deref());
let discriminator_hex = raydium_instruction_discriminator_hex(protocol_name, data_bytes.as_deref(), 0);
let discriminator_hex =
raydium_instruction_discriminator_hex(protocol_name, data_bytes.as_deref(), 0);
let anchor_self_cpi_log =
discriminator_hex.as_deref() == Some(METEORA_ANCHOR_SELF_CPI_LOG_SELECTOR_HEX);
let anchor_event_discriminator_hex = if anchor_self_cpi_log {
@@ -4617,7 +4679,8 @@ fn build_raydium_instruction_audit_payload(
};
let data_base58 = parse_instruction_data_base58(instruction.data_json.as_deref());
let data_bytes = instruction_data_bytes_from_base58(data_base58.as_deref());
let discriminator_hex = raydium_instruction_discriminator_hex(protocol_name, data_bytes.as_deref(), 0);
let discriminator_hex =
raydium_instruction_discriminator_hex(protocol_name, data_bytes.as_deref(), 0);
let anchor_self_cpi_log =
discriminator_hex.as_deref() == Some(METEORA_ANCHOR_SELF_CPI_LOG_SELECTOR_HEX);
let anchor_event_discriminator_hex = if anchor_self_cpi_log {
@@ -5266,6 +5329,8 @@ fn prepare_pump_swap_trade_payload_for_classification(
};
if event.pool_account.is_some() && event.token_a_mint.is_some() && event.token_b_mint.is_some()
{
object.insert("tradeCandidate".to_string(), serde_json::Value::Bool(true));
object.insert("candleCandidate".to_string(), serde_json::Value::Bool(true));
return serde_json::Value::Object(object);
}
object.insert("tradeCandidate".to_string(), serde_json::Value::Bool(false));

View File

@@ -127,6 +127,14 @@ pub(crate) fn dex_detection_route(
}
return Some(crate::dex_detection_route::DexDetectionRoute::PumpSwapTrade);
},
("pump_swap", "pump_swap.buy_exact_quote_in") => {
if crate::dex_detection_route::is_incomplete_pump_swap_decoded_event(decoded_event) {
return Some(
crate::dex_detection_route::DexDetectionRoute::SkipIncompletePumpSwapTrade,
);
}
return Some(crate::dex_detection_route::DexDetectionRoute::PumpSwapTrade);
},
("pump_swap", "pump_swap.sell") => {
if crate::dex_detection_route::is_incomplete_pump_swap_decoded_event(decoded_event) {
return Some(
@@ -301,6 +309,47 @@ mod tests {
));
}
#[test]
fn pump_swap_buy_exact_quote_in_routes_to_pool_detection_when_complete() {
let event = make_decoded_event(
"pump_swap",
"pump_swap.buy_exact_quote_in",
Some("PumpSwapPool111"),
Some("TokenA111"),
Some("TokenB111"),
);
let route_option = crate::dex_detection_route::dex_detection_route(&event);
let route = match route_option {
Some(route) => route,
None => panic!("route must be selected"),
};
assert_eq!(route, crate::dex_detection_route::DexDetectionRoute::PumpSwapTrade);
assert!(crate::dex_detection_route::dex_detection_route_requires_full_pool_context(
route
));
}
#[test]
fn pump_swap_buy_exact_quote_in_incomplete_route_is_skipped() {
let event = make_decoded_event(
"pump_swap",
"pump_swap.buy_exact_quote_in",
Some("PumpSwapPool111"),
Some("TokenA111"),
None,
);
let route_option = crate::dex_detection_route::dex_detection_route(&event);
let route = match route_option {
Some(route) => route,
None => panic!("route must be selected"),
};
assert_eq!(
route,
crate::dex_detection_route::DexDetectionRoute::SkipIncompletePumpSwapTrade
);
}
#[test]
fn pump_fun_create_token_route_does_not_require_full_pool_context() {
let event = make_decoded_event(

View File

@@ -329,6 +329,12 @@ pub fn is_dex_trade_event_kind(event_kind: &str) -> bool {
if event_kind.ends_with(".sell") {
return true;
}
if event_kind.contains(".buy_exact") {
return true;
}
if event_kind.contains(".sell_exact") {
return true;
}
if event_kind.ends_with(".swap") {
return true;
}
@@ -484,6 +490,12 @@ pub fn is_dex_fee_event_kind(event_kind: &str) -> bool {
if event_kind.contains("collect_creator_fee") {
return true;
}
if event_kind.contains("collect_coin_creator_fee") {
return true;
}
if event_kind.contains("transfer_creator_fees") {
return true;
}
if event_kind.contains("collect_protocol_fee") {
return true;
}
@@ -519,6 +531,18 @@ pub fn is_dex_reward_event_kind(event_kind: &str) -> bool {
if event_kind.contains("emission") {
return true;
}
if event_kind.contains("incentive") {
return true;
}
if event_kind.contains("toggle_cashback_enabled") {
return false;
}
if event_kind.contains("cashback") {
return true;
}
if event_kind.contains("volume_accumulator") {
return true;
}
return false;
}
@@ -657,6 +681,9 @@ pub fn is_dex_migration_event_kind(event_kind: &str) -> bool {
if event_kind.contains(".migrate_to_open_book") {
return false;
}
if event_kind.contains(".migrate_pool_coin_creator") {
return false;
}
if event_kind.contains(".migrate") {
return true;
}
@@ -759,6 +786,9 @@ pub fn is_dex_admin_event_kind(event_kind: &str) -> bool {
if event_kind.contains(".migrate_to_open_book") {
return false;
}
if event_kind.contains(".migrate_pool_coin_creator") {
return true;
}
if event_kind.contains(".close_platform_global_access") {
return true;
}
@@ -789,6 +819,15 @@ pub fn is_dex_admin_event_kind(event_kind: &str) -> bool {
if event_kind.contains("set_") {
return true;
}
if event_kind.contains("toggle_") {
return true;
}
if event_kind.contains(".disable") {
return true;
}
if event_kind.contains(".extend_account") {
return true;
}
if event_kind.contains("update_") {
return true;
}
@@ -1154,6 +1193,19 @@ mod tests {
super::classify_dex_event_category_code("raydium_clmm.set_reward_params"),
"reward"
);
assert_eq!(
super::classify_dex_event_category_code("pump_swap.toggle_cashback_enabled"),
"admin"
);
assert!(!super::is_dex_reward_event_kind("pump_swap.toggle_cashback_enabled"));
assert!(super::is_dex_admin_event_kind("pump_swap.toggle_cashback_enabled"));
assert_eq!(
super::classify_dex_event_category_code("pump_swap.migrate_pool_coin_creator"),
"admin"
);
assert!(!super::is_dex_pool_lifecycle_event_kind(
"pump_swap.migrate_pool_coin_creator"
));
assert_eq!(
super::classify_dex_event_category_code("raydium_clmm.increase_liquidity_v2"),
"liquidity"

View File

@@ -89,6 +89,16 @@ impl DexEventCoverageService {
Ok(sync_counts) => sync_counts,
Err(error) => return Err(error),
};
let cleanup_result =
self.cleanup_deprecated_pump_swap_observed_unknown_rows(&decoder_code).await;
if let Err(error) = cleanup_result {
return Err(error);
}
let duplicate_cleanup_result =
self.cleanup_duplicate_pump_swap_logical_coverage_rows(&decoder_code).await;
if let Err(error) = duplicate_cleanup_result {
return Err(error);
}
let refreshed_entry_count = match &decoder_code {
Some(decoder_code) => {
let refresh_result =
@@ -139,6 +149,16 @@ impl DexEventCoverageService {
Ok(sync_counts) => sync_counts,
Err(error) => return Err(error),
};
let cleanup_result =
self.cleanup_deprecated_pump_swap_observed_unknown_rows(&decoder_code).await;
if let Err(error) = cleanup_result {
return Err(error);
}
let duplicate_cleanup_result =
self.cleanup_duplicate_pump_swap_logical_coverage_rows(&decoder_code).await;
if let Err(error) = duplicate_cleanup_result {
return Err(error);
}
let refreshed_entry_count = match &decoder_code {
Some(decoder_code) => {
let refresh_result =
@@ -178,6 +198,86 @@ impl DexEventCoverageService {
summaries,
});
}
async fn cleanup_deprecated_pump_swap_observed_unknown_rows(
&self,
decoder_code: &std::option::Option<std::string::String>,
) -> Result<u64, crate::Error> {
if let Some(decoder_code) = decoder_code {
if decoder_code != "pump_swap" {
return Ok(0);
}
}
match self.database.connection() {
crate::DatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query(
r#"
DELETE FROM k_sol_dex_event_coverage_entries
WHERE decoder_code = 'pump_swap'
AND (
entry_name LIKE 'observed_unknown_%'
OR local_event_kind LIKE 'pump_swap.observed_unknown_%'
)
"#,
)
.execute(pool)
.await;
match query_result {
Ok(query_result) => return Ok(query_result.rows_affected()),
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot delete deprecated PumpSwap observed_unknown coverage rows on sqlite: {}",
error
)));
},
}
},
}
}
async fn cleanup_duplicate_pump_swap_logical_coverage_rows(
&self,
decoder_code: &std::option::Option<std::string::String>,
) -> Result<u64, crate::Error> {
if let Some(decoder_code) = decoder_code {
if decoder_code != "pump_swap" {
return Ok(0);
}
}
match self.database.connection() {
crate::DatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query(
r#"
DELETE FROM k_sol_dex_event_coverage_entries
WHERE decoder_code = 'pump_swap'
AND id NOT IN (
SELECT MIN(id)
FROM k_sol_dex_event_coverage_entries
WHERE decoder_code = 'pump_swap'
GROUP BY
decoder_code,
COALESCE(program_id, ''),
entry_kind,
entry_name,
COALESCE(discriminator_hex, ''),
COALESCE(local_event_kind, '')
)
"#,
)
.execute(pool)
.await;
match query_result {
Ok(query_result) => return Ok(query_result.rows_affected()),
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot delete duplicate PumpSwap logical coverage rows on sqlite: {}",
error
)));
},
}
},
}
}
}
fn build_coverage_entry_from_upstream(
@@ -215,6 +315,9 @@ fn infer_expected_db_target_for_entry(
event_family: std::option::Option<&str>,
entry_kind: &str,
) -> std::option::Option<std::string::String> {
if decoder_code == "pump_swap" {
return infer_pump_swap_expected_db_target(entry_name, entry_kind);
}
if decoder_code == "raydium_cpmm"
&& (entry_name == "swap_event" || entry_name == "anchor_idl_instruction")
{
@@ -421,11 +524,201 @@ fn infer_expected_db_target(
return Some(target.to_string());
}
fn infer_pump_swap_expected_db_target(
entry_name: &str,
entry_kind: &str,
) -> std::option::Option<std::string::String> {
if entry_kind == crate::ENTRY_KIND_PROGRAM {
return None;
}
if entry_name == "buy" || entry_name == "sell" || entry_name == "buy_exact_quote_in" {
return Some(crate::DexEventCoverageEntryDto::DB_TARGET_TRADE_EVENTS.to_string());
}
if entry_name.starts_with("observed_unknown_") {
return Some(crate::DexEventCoverageEntryDto::DB_TARGET_DECODED_EVENTS_ONLY.to_string());
}
if entry_name.ends_with("_event") && entry_name != "claim_token_incentives_event" {
return Some(crate::DexEventCoverageEntryDto::DB_TARGET_DECODED_EVENTS_ONLY.to_string());
}
if entry_name == "deposit"
|| entry_name == "deposit_event"
|| entry_name == "withdraw"
|| entry_name == "withdraw_event"
{
return Some(crate::DexEventCoverageEntryDto::DB_TARGET_LIQUIDITY_EVENTS.to_string());
}
if entry_name == "create_pool" || entry_name == "create_pool_event" {
return Some(crate::DexEventCoverageEntryDto::DB_TARGET_POOL_LIFECYCLE_EVENTS.to_string());
}
if entry_name == "collect_coin_creator_fee"
|| entry_name == "collect_coin_creator_fee_event"
|| entry_name == "transfer_creator_fees_to_pump"
|| entry_name == "transfer_creator_fees_to_pump_v2"
{
return Some(crate::DexEventCoverageEntryDto::DB_TARGET_FEE_EVENTS.to_string());
}
if entry_name == "claim_cashback"
|| entry_name == "claim_cashback_event"
|| entry_name == "claim_token_incentives"
|| entry_name == "claim_token_incentives_event"
|| entry_name == "admin_update_token_incentives"
|| entry_name == "admin_update_token_incentives_event"
|| entry_name == "init_user_volume_accumulator"
|| entry_name == "init_user_volume_accumulator_event"
|| entry_name == "sync_user_volume_accumulator"
|| entry_name == "sync_user_volume_accumulator_event"
|| entry_name == "close_user_volume_accumulator"
|| entry_name == "close_user_volume_accumulator_event"
{
return Some(crate::DexEventCoverageEntryDto::DB_TARGET_REWARD_EVENTS.to_string());
}
if entry_name == "admin_set_coin_creator"
|| entry_name == "admin_set_coin_creator_event"
|| entry_name == "create_config"
|| entry_name == "create_config_event"
|| entry_name == "disable"
|| entry_name == "disable_event"
|| entry_name == "extend_account"
|| entry_name == "extend_account_event"
|| entry_name == "migrate_pool_coin_creator"
|| entry_name == "migrate_pool_coin_creator_event"
|| entry_name == "reserved_fee_recipients_event"
|| entry_name == "set_bonding_curve_coin_creator_event"
|| entry_name == "set_coin_creator"
|| entry_name == "set_metaplex_coin_creator_event"
|| entry_name == "set_reserved_fee_recipient"
|| entry_name == "set_reserved_fee_recipients"
|| entry_name == "toggle_cashback_enabled"
|| entry_name == "toggle_mayhem_mode"
|| entry_name == "update_admin"
|| entry_name == "update_buyback_config"
|| entry_name == "update_admin_event"
|| entry_name == "update_fee_config"
|| entry_name == "update_fee_config_event"
{
return Some(crate::DexEventCoverageEntryDto::DB_TARGET_POOL_ADMIN_EVENTS.to_string());
}
return Some(crate::DexEventCoverageEntryDto::DB_TARGET_DECODED_EVENTS_ONLY.to_string());
}
fn infer_pump_swap_event_family(
entry_name: &str,
entry_kind: &str,
) -> std::option::Option<std::string::String> {
if entry_kind == crate::ENTRY_KIND_PROGRAM {
return None;
}
if entry_name == "buy" || entry_name == "sell" || entry_name == "buy_exact_quote_in" {
return Some("swap".to_string());
}
if entry_name == "buy_event" || entry_name == "sell_event" {
return Some("swap_event_audit".to_string());
}
if entry_name == "deposit" || entry_name == "deposit_event" {
return Some("liquidity_add".to_string());
}
if entry_name == "withdraw" || entry_name == "withdraw_event" {
return Some("liquidity_remove".to_string());
}
if entry_name == "create_pool" || entry_name == "create_pool_event" {
return Some("pool_create".to_string());
}
if entry_name.starts_with("observed_unknown_") {
return Some("observed_unknown_instruction".to_string());
}
if entry_name == "toggle_cashback_enabled"
|| entry_name == "set_reserved_fee_recipient"
|| entry_name == "set_reserved_fee_recipients"
|| entry_name == "reserved_fee_recipients_event"
{
return Some("admin_config".to_string());
}
if entry_name.contains("creator_fee") || entry_name.contains("fee_recipient") {
return Some("fee".to_string());
}
if entry_name.contains("cashback")
|| entry_name.contains("incentive")
|| entry_name.contains("volume_accumulator")
{
return Some("reward".to_string());
}
if entry_name.contains("config")
|| entry_name.contains("admin")
|| entry_name.contains("disable")
|| entry_name.contains("toggle")
|| entry_name.contains("coin_creator")
|| entry_name.contains("extend_account")
{
return Some("admin_config".to_string());
}
return infer_event_family(entry_name, entry_kind);
}
fn pump_swap_local_event_kind(entry_name: &str) -> std::option::Option<std::string::String> {
if entry_name.ends_with("_event") {
return Some(format!("pump_swap.{}", entry_name));
}
match entry_name {
"admin_set_coin_creator" => return Some("pump_swap.admin_set_coin_creator".to_string()),
"admin_update_token_incentives" => {
return Some("pump_swap.admin_update_token_incentives".to_string());
},
"buy" => return Some("pump_swap.buy".to_string()),
"buy_exact_quote_in" => return Some("pump_swap.buy_exact_quote_in".to_string()),
"claim_cashback" => return Some("pump_swap.claim_cashback".to_string()),
"claim_token_incentives" => return Some("pump_swap.claim_token_incentives".to_string()),
"close_user_volume_accumulator" => {
return Some("pump_swap.close_user_volume_accumulator".to_string());
},
"collect_coin_creator_fee" => {
return Some("pump_swap.collect_coin_creator_fee".to_string());
},
"create_config" => return Some("pump_swap.create_config".to_string()),
"create_pool" => return Some("pump_swap.create_pool".to_string()),
"deposit" => return Some("pump_swap.deposit".to_string()),
"disable" => return Some("pump_swap.disable".to_string()),
"extend_account" => return Some("pump_swap.extend_account".to_string()),
"init_user_volume_accumulator" => {
return Some("pump_swap.init_user_volume_accumulator".to_string());
},
"migrate_pool_coin_creator" => {
return Some("pump_swap.migrate_pool_coin_creator".to_string());
},
"sell" => return Some("pump_swap.sell".to_string()),
"set_coin_creator" => return Some("pump_swap.set_coin_creator".to_string()),
"set_reserved_fee_recipients" => {
return Some("pump_swap.set_reserved_fee_recipients".to_string());
},
"sync_user_volume_accumulator" => {
return Some("pump_swap.sync_user_volume_accumulator".to_string());
},
"toggle_cashback_enabled" => return Some("pump_swap.toggle_cashback_enabled".to_string()),
"toggle_mayhem_mode" => return Some("pump_swap.toggle_mayhem_mode".to_string()),
"transfer_creator_fees_to_pump" => {
return Some("pump_swap.transfer_creator_fees_to_pump".to_string());
},
"transfer_creator_fees_to_pump_v2" => {
return Some("pump_swap.transfer_creator_fees_to_pump_v2".to_string());
},
"update_admin" => return Some("pump_swap.update_admin".to_string()),
"update_buyback_config" => return Some("pump_swap.update_buyback_config".to_string()),
"update_fee_config" => return Some("pump_swap.update_fee_config".to_string()),
"withdraw" => return Some("pump_swap.withdraw".to_string()),
"set_reserved_fee_recipient" => {
return Some("pump_swap.set_reserved_fee_recipient".to_string());
},
_ => return None,
}
}
fn infer_event_family_for_entry(
decoder_code: &str,
entry_name: &str,
entry_kind: &str,
) -> std::option::Option<std::string::String> {
if decoder_code == "pump_swap" {
return infer_pump_swap_event_family(entry_name, entry_kind);
}
if decoder_code == "raydium_launchpad" {
return infer_raydium_launchpad_event_family(entry_name, entry_kind);
}
@@ -488,7 +781,6 @@ fn infer_raydium_cpmm_event_family(
}
}
fn infer_raydium_stable_swap_event_family(
entry_name: &str,
entry_kind: &str,
@@ -789,7 +1081,6 @@ fn raydium_amm_v4_local_event_kind(entry_name: &str) -> std::option::Option<std:
}
}
fn raydium_stable_swap_local_event_kind(
entry_name: &str,
) -> std::option::Option<std::string::String> {
@@ -819,6 +1110,9 @@ pub(crate) fn known_local_event_kind(
decoder_code: &str,
entry_name: &str,
) -> std::option::Option<std::string::String> {
if decoder_code == "pump_swap" {
return pump_swap_local_event_kind(entry_name);
}
if decoder_code == "raydium_amm_v4" {
return raydium_amm_v4_local_event_kind(entry_name);
}

View File

@@ -319,6 +319,63 @@ fn resolve_instruction_name(
};
return Some(format!("raydium_launchpad.{}", layout.instruction_name));
}
if program_id == crate::PUMP_SWAP_PROGRAM_ID || decoder_code == Some("pump_swap") {
let name = match discriminator_hex {
"e445a52e51cb9a1d" => "anchor_self_cpi_log",
"f228759149606968" => "admin_set_coin_creator",
"d10b7357d5177ccc" => "admin_update_token_incentives",
"66063d1201daebea" => "buy",
"c62e1552b4d9e870" => "buy_exact_quote_in",
"253a237ebe35e4c5" => "claim_cashback",
"1004471ccc01281b" => "claim_token_incentives",
"f945a4da9667548a" => "close_user_volume_accumulator",
"a039592ab58b2b42" => "collect_coin_creator_fee",
"c9cff3724b6f2fbd" => "create_config",
"e992d18ecf6840bc" => "create_pool",
"f223c68952e1f2b6" => "deposit",
"b9adbb5ad80feee9" => "disable",
"ea66c2cb96483ee5" => "extend_account",
"5e06ca73ff60e8b7" => "init_user_volume_accumulator",
"d0089f044aaf103a" => "migrate_pool_coin_creator",
"33e685a4017f83ad" => "sell",
"d295802dbc3a4eaf" => "set_coin_creator",
"6faca2e87259d58e" => "set_reserved_fee_recipients",
"561fc057a3574fee" => "sync_user_volume_accumulator",
"7367e0ffbd5956c3" => "toggle_cashback_enabled",
"01096fd0641fffa3" => "toggle_mayhem_mode",
"8b348655e4e56cf1" => "transfer_creator_fees_to_pump",
"01214eb921432c5c" => "transfer_creator_fees_to_pump_v2",
"a1b028d53cb8b3e4" => "update_admin",
"fbe0ab92a01a71e9" => "update_buyback_config",
"68b867f258976b14" => "update_fee_config",
"b712469c946da122" => "withdraw",
"cfbdb247a77a44b4" => "set_reserved_fee_recipient",
"2ddc5d181961ac68" => "admin_set_coin_creator_event",
"93fa6c78f71d43de" => "admin_update_token_incentives_event",
"67f4521f2cf57777" => "buy_event",
"e2d6f62107f293e5" => "claim_cashback_event",
"4facf631cd5bcee8" => "claim_token_incentives_event",
"929fbdac925838f4" => "close_user_volume_accumulator_event",
"e8f5c2eeeada3a59" => "collect_coin_creator_fee_event",
"6b34598137e25116" => "create_config_event",
"b1310cd2a076a774" => "create_pool_event",
"78f83d531f8e6b90" => "deposit_event",
"6bfdc14ce4ca1b68" => "disable_event",
"6161d7905d92167c" => "extend_account_event",
"86240d48e86582d8" => "init_user_volume_accumulator_event",
"aadd52c793a5f72e" => "migrate_pool_coin_creator_event",
"2bbcfa12dd4bbb5f" => "reserved_fee_recipients_event",
"3e2f370aa503dc2a" => "sell_event",
"f2e7eb664163bdd3" => "set_bonding_curve_coin_creator_event",
"966bc77b7ccf66e4" => "set_metaplex_coin_creator_event",
"c57aa77c74515bff" => "sync_user_volume_accumulator_event",
"e198ab57f63f42ea" => "update_admin_event",
"5a1741233ef4bcd0" => "update_fee_config_event",
"1609851aa02c47c0" => "withdraw_event",
_ => return None,
};
return Some(name.to_string());
}
return None;
}

View File

@@ -1183,6 +1183,8 @@ pub use dex::PumpFunTradeDecoded;
pub use dex::PumpSwapDecodedEvent;
/// PumpSwap decoder.
pub use dex::PumpSwapDecoder;
/// Decoded PumpSwap non-trade instruction event.
pub use dex::PumpSwapInstructionDecoded;
/// Decoded PumpSwap trade event.
pub use dex::PumpSwapTradeDecoded;
/// Decoded Raydium AmmV4 event.

View File

@@ -109,6 +109,9 @@ impl NonTradeEventMaterializationService {
continue;
},
};
if is_anchor_event_audit_only(&payload) {
continue;
}
if crate::is_dex_pool_lifecycle_event_kind(decoded_event.event_kind.as_str()) {
let cleanup_result =
self.delete_stale_pool_admin_event_for_lifecycle(decoded_event).await;
@@ -1935,6 +1938,18 @@ fn extract_first_bool(
return None;
}
fn is_anchor_event_audit_only(payload: &serde_json::Value) -> bool {
if let Some(object) = payload.as_object() {
let flag = object.get("anchorEventAuditOnly");
if let Some(flag) = flag {
if flag.as_bool() == Some(true) {
return true;
}
}
}
return false;
}
fn transaction_has_effective_error(transaction: &crate::ChainTransactionDto) -> bool {
let err_json = match transaction.err_json.as_ref() {
Some(err_json) => err_json.trim(),

View File

@@ -47,9 +47,10 @@ impl TradeAggregationService {
Err(error) => return Err(error),
};
let transaction = transaction_context.transaction;
if transaction.err_json.is_some() {
if crate::trade_aggregation::transaction_has_effective_error(&transaction) {
tracing::debug!(
signature = %transaction.signature,
err_json = ?transaction.err_json,
"skipping trade aggregation for failed transaction"
);
return Ok(std::vec::Vec::new());
@@ -199,6 +200,21 @@ impl TradeAggregationService {
}
}
fn transaction_has_effective_error(transaction: &crate::ChainTransactionDto) -> bool {
let err_json = match transaction.err_json.as_ref() {
Some(err_json) => err_json.trim(),
None => return false,
};
if err_json.is_empty() {
return false;
}
if err_json == "null" {
return false;
}
return true;
}
#[cfg(test)]
mod tests {
async fn make_database() -> std::sync::Arc<crate::Database> {
@@ -416,6 +432,51 @@ mod tests {
assert_eq!(pair_metric.trade_count, 1);
}
#[test]
fn transaction_null_err_json_is_not_effective_error() {
let transaction = crate::ChainTransactionDto::new(
"sig-null-err".to_string(),
Some(1),
None,
Some("test".to_string()),
Some("0".to_string()),
Some("null".to_string()),
None,
serde_json::json!({"meta":{"err":null}}).to_string(),
);
assert!(!super::transaction_has_effective_error(&transaction));
}
#[test]
fn transaction_empty_err_json_is_not_effective_error() {
let transaction = crate::ChainTransactionDto::new(
"sig-empty-err".to_string(),
Some(1),
None,
Some("test".to_string()),
Some("0".to_string()),
Some("".to_string()),
None,
serde_json::json!({"meta":{"err":null}}).to_string(),
);
assert!(!super::transaction_has_effective_error(&transaction));
}
#[test]
fn transaction_non_null_err_json_is_effective_error() {
let transaction = crate::ChainTransactionDto::new(
"sig-real-err".to_string(),
Some(1),
None,
Some("test".to_string()),
Some("0".to_string()),
Some(serde_json::json!({"InstructionError":[0,{"Custom":1}]}).to_string()),
None,
serde_json::json!({"meta":{"err":{"InstructionError":[0,{"Custom":1}]}}}).to_string(),
);
assert!(super::transaction_has_effective_error(&transaction));
}
#[tokio::test]
async fn record_transaction_by_signature_skips_failed_transaction() {
let database = make_database().await;

View File

@@ -138,6 +138,14 @@ pub(crate) async fn resolve_trade_amounts(
return Err(error);
}
}
if input.decoded_event.event_kind == "pump_swap.buy_exact_quote_in" {
crate::trade_amount_resolution::normalize_pump_swap_anchor_buy_exact_quote_in_amounts(
input,
&mut base_amount_raw,
&mut quote_amount_raw,
&mut resolved_trade_side,
);
}
if input.decoded_event.event_kind.starts_with("raydium_launchpad.")
&& (base_amount_raw.is_none()
|| quote_amount_raw.is_none()
@@ -442,7 +450,12 @@ async fn apply_pump_swap_amount_fallbacks(
None => payload_pool_quote_token_account.as_deref(),
};
let (input_vault_address, output_vault_address, input_token_account, output_token_account) =
if input.decoded_event.event_kind.ends_with(".buy") {
if input.decoded_event.event_kind.ends_with(".buy")
|| input
.decoded_event
.event_kind
.ends_with(".buy_exact_quote_in")
{
(
effective_quote_vault_address,
effective_base_vault_address,
@@ -1401,6 +1414,74 @@ async fn load_decoded_instruction(
return Ok(instruction_option);
}
fn normalize_pump_swap_anchor_buy_exact_quote_in_amounts(
input: &crate::trade_amount_resolution::TradeAmountResolutionInput<'_>,
base_amount_raw: &mut std::option::Option<std::string::String>,
quote_amount_raw: &mut std::option::Option<std::string::String>,
resolved_trade_side: &mut std::option::Option<crate::SwapTradeSide>,
) {
let amount_source = crate::trade_amount_resolution::extract_string_by_candidate_keys(
input.payload,
&["amountSource", "amount_source"],
);
if amount_source.as_deref() != Some("pump_swap_anchor_buy_event") {
return;
}
let token_a_mint = crate::trade_amount_resolution::extract_string_by_candidate_keys(
input.payload,
&["tokenAMint", "token_a_mint"],
);
let token_b_mint = crate::trade_amount_resolution::extract_string_by_candidate_keys(
input.payload,
&["tokenBMint", "token_b_mint"],
);
let token_a_mint = match token_a_mint.as_deref() {
Some(token_a_mint) => token_a_mint,
None => return,
};
let token_b_mint = match token_b_mint.as_deref() {
Some(token_b_mint) => token_b_mint,
None => return,
};
let pair_base_mint = match input.base_token_mint {
Some(pair_base_mint) => pair_base_mint,
None => return,
};
let pair_quote_mint = match input.quote_token_mint {
Some(pair_quote_mint) => pair_quote_mint,
None => return,
};
let anchor_base_amount_raw = crate::trade_amount_resolution::extract_amount_string(
input.payload,
&["baseAmountRaw", "baseAmountOutRaw", "base_amount_out_raw"],
);
let anchor_quote_amount_raw = crate::trade_amount_resolution::extract_amount_string(
input.payload,
&["quoteAmountRaw", "userQuoteAmountInRaw", "user_quote_amount_in_raw"],
);
let anchor_base_amount_raw = match anchor_base_amount_raw {
Some(anchor_base_amount_raw) => anchor_base_amount_raw,
None => return,
};
let anchor_quote_amount_raw = match anchor_quote_amount_raw {
Some(anchor_quote_amount_raw) => anchor_quote_amount_raw,
None => return,
};
if pair_base_mint == token_a_mint && pair_quote_mint == token_b_mint {
*base_amount_raw = Some(anchor_base_amount_raw);
*quote_amount_raw = Some(anchor_quote_amount_raw);
*resolved_trade_side = Some(crate::SwapTradeSide::BuyBase);
return;
}
if pair_base_mint == token_b_mint && pair_quote_mint == token_a_mint {
*base_amount_raw = Some(anchor_quote_amount_raw);
*quote_amount_raw = Some(anchor_base_amount_raw);
*resolved_trade_side = Some(crate::SwapTradeSide::SellBase);
return;
}
}
fn extract_amount_string(
payload: &serde_json::Value,
candidate_keys: &[&str],

View File

@@ -23,10 +23,10 @@ pub(crate) fn extract_trade_side(
Some("SELL") => return crate::SwapTradeSide::SellBase,
_ => {},
}
if event_kind.ends_with(".buy") {
if event_kind.ends_with(".buy") || event_kind.contains(".buy_exact") {
return crate::SwapTradeSide::BuyBase;
}
if event_kind.ends_with(".sell") {
if event_kind.ends_with(".sell") || event_kind.contains(".sell_exact") {
return crate::SwapTradeSide::SellBase;
}
return crate::SwapTradeSide::Unknown;
@@ -109,6 +109,13 @@ mod tests {
assert_eq!(side, crate::SwapTradeSide::SellBase);
}
#[test]
fn buy_exact_suffix_is_resolved_as_buy_base() {
let payload = serde_json::json!({});
let side = super::extract_trade_side("pump_swap.buy_exact_quote_in", &payload);
assert_eq!(side, crate::SwapTradeSide::BuyBase);
}
#[test]
fn unknown_side_is_returned_when_no_hint_exists() {
let payload = serde_json::json!({});

View File

@@ -14,9 +14,8 @@ const UPSTREAM_GIT_ALIAS_PROGRAM_NOTES: &str = "upstream Git decoder name kept a
const RAYDIUM_IDL_SOURCE_REPO: &str = "raydium-io/raydium-idl";
const MANUAL_SOLSCAN_SOURCE_REPO: &str = "manual-solscan";
const RAYDIUM_IDL_DISCRIMINATOR_NOTES: &str = "entry name and discriminator extracted from Raydium official IDL snapshot; not corpus-verified; no trade/candle/materialization proof";
const MANUAL_SOLSCAN_DISCRIMINATOR_NOTES: &str = "entry name and discriminator derived from local corpus plus manual Solscan transaction-log inspection; no trade/candle/materialization proof";
const MANUAL_SOLSCAN_DISCRIMINATOR_NOTES: &str = "entry name and discriminator derived from manual Solscan program IDL or Solscan transaction-log inspection; no trade/candle/materialization proof";
const fn manual_solscan_discriminator_entry(
decoder_code: &'static str,
@@ -10735,6 +10734,39 @@ pub(crate) const UPSTREAM_REGISTRY_ENTRIES: &[crate::UpstreamRegistryEntry] = &[
8,
"decoders/pump-swap-decoder/src/instructions/withdraw.rs",
),
manual_solscan_discriminator_entry(
"pump_swap",
Some(crate::PUMP_SWAP_PROGRAM_ID),
"pump",
"amm",
crate::ENTRY_KIND_INSTRUCTION,
"transfer_creator_fees_to_pump_v2",
"01214eb921432c5c",
8,
"solscan.io/account/pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA#programIdl:transfer_creator_fees_to_pump_v2",
),
manual_solscan_discriminator_entry(
"pump_swap",
Some(crate::PUMP_SWAP_PROGRAM_ID),
"pump",
"amm",
crate::ENTRY_KIND_INSTRUCTION,
"update_buyback_config",
"fbe0ab92a01a71e9",
8,
"solscan.io/account/pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA#programIdl:update_buyback_config",
),
manual_solscan_discriminator_entry(
"pump_swap",
Some(crate::PUMP_SWAP_PROGRAM_ID),
"pump",
"amm",
crate::ENTRY_KIND_INSTRUCTION,
"set_reserved_fee_recipient",
"cfbdb247a77a44b4",
8,
"validation_sql/SQL_VALIDATION_PUMP_SWAP_0_7_53.sql#local_log_instruction_names",
),
upstream_git_discriminator_entry(
"pump_swap",
Some(crate::PUMP_SWAP_PROGRAM_ID),