This commit is contained in:
2026-05-29 07:38:24 +02:00
parent 96b6209482
commit ffa4acbccb
15 changed files with 1982 additions and 107 deletions

View File

@@ -21,8 +21,8 @@ pub use dtos::ChainSlotDto;
pub use dtos::ChainTransactionDto;
pub use dtos::DbMetadataDto;
pub use dtos::DbRuntimeEventDto;
pub use dtos::DexDecodedEventDto;
pub use dtos::DexDecodeReplayLedgerDto;
pub use dtos::DexDecodedEventDto;
pub use dtos::DexDto;
pub use dtos::FeeEventDto;
pub use dtos::KnownHttpEndpointDto;
@@ -88,8 +88,8 @@ pub use entities::ChainSlotEntity;
pub use entities::ChainTransactionEntity;
pub use entities::DbMetadataEntity;
pub use entities::DbRuntimeEventEntity;
pub use entities::DexDecodedEventEntity;
pub use entities::DexDecodeReplayLedgerEntity;
pub use entities::DexDecodedEventEntity;
pub use entities::DexEntity;
pub use entities::FeeEventEntity;
pub use entities::KnownHttpEndpointEntity;
@@ -142,14 +142,16 @@ pub use queries::query_db_metadatas_list;
pub use queries::query_db_metadatas_upsert;
pub use queries::query_db_runtime_events_insert;
pub use queries::query_db_runtime_events_list_recent;
pub use queries::query_dex_decode_replay_ledger_get_by_signature;
pub use queries::query_dex_decode_replay_ledger_get_by_transaction;
pub use queries::query_dex_decode_replay_ledger_upsert;
pub use queries::query_dex_decoded_events_delete_by_key;
pub use queries::query_dex_decoded_events_delete_meteora_dlmm_anchor_swap_instruction_audits;
pub use queries::query_dex_decoded_events_delete_related_instruction_audit;
pub use queries::query_dex_decoded_events_get_by_key;
pub use queries::query_dex_decoded_events_get_latest_pump_fun_create_payload_by_mint;
pub use queries::query_dex_decoded_events_list_by_transaction_id;
pub use queries::query_dex_decoded_events_upsert;
pub use queries::query_dex_decode_replay_ledger_get_by_signature;
pub use queries::query_dex_decode_replay_ledger_get_by_transaction;
pub use queries::query_dex_decode_replay_ledger_upsert;
pub use queries::query_dexs_get_by_code;
pub use queries::query_dexs_list;
pub use queries::query_dexs_upsert;

View File

@@ -9,8 +9,8 @@ mod chain_transaction;
mod db_metadata;
mod db_runtime_event;
mod dex;
mod dex_decoded_event;
mod dex_decode_replay_ledger;
mod dex_decoded_event;
mod fee_event;
mod known_http_endpoint;
mod known_ws_endpoint;
@@ -66,14 +66,16 @@ pub use db_runtime_event::query_db_runtime_events_list_recent;
pub use dex::query_dexs_get_by_code;
pub use dex::query_dexs_list;
pub use dex::query_dexs_upsert;
pub use dex_decode_replay_ledger::query_dex_decode_replay_ledger_get_by_signature;
pub use dex_decode_replay_ledger::query_dex_decode_replay_ledger_get_by_transaction;
pub use dex_decode_replay_ledger::query_dex_decode_replay_ledger_upsert;
pub use dex_decoded_event::query_dex_decoded_events_delete_by_key;
pub use dex_decoded_event::query_dex_decoded_events_delete_meteora_dlmm_anchor_swap_instruction_audits;
pub use dex_decoded_event::query_dex_decoded_events_delete_related_instruction_audit;
pub use dex_decoded_event::query_dex_decoded_events_get_by_key;
pub use dex_decoded_event::query_dex_decoded_events_get_latest_pump_fun_create_payload_by_mint;
pub use dex_decoded_event::query_dex_decoded_events_list_by_transaction_id;
pub use dex_decoded_event::query_dex_decoded_events_upsert;
pub use dex_decode_replay_ledger::query_dex_decode_replay_ledger_get_by_signature;
pub use dex_decode_replay_ledger::query_dex_decode_replay_ledger_get_by_transaction;
pub use dex_decode_replay_ledger::query_dex_decode_replay_ledger_upsert;
pub use fee_event::query_fee_events_get_by_decoded_event_id;
pub use fee_event::query_fee_events_list_recent;
pub use fee_event::query_fee_events_upsert;

View File

@@ -128,6 +128,109 @@ WHERE transaction_id = ?
}
}
/// Deletes decoded DEX instruction audit rows related to one decoded instruction.
///
/// This removes an audit row attached to the decoded instruction itself, its direct
/// parent instruction, or its direct child instructions inside the same transaction.
pub async fn query_dex_decoded_events_delete_related_instruction_audit(
database: &crate::Database,
transaction_id: i64,
instruction_id: i64,
audit_event_kind: &str,
) -> Result<u64, crate::Error> {
match database.connection() {
crate::DatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query(
r#"
DELETE FROM k_sol_dex_decoded_events
WHERE transaction_id = ?
AND event_kind = ?
AND instruction_id IN (
SELECT id
FROM k_sol_chain_instructions
WHERE transaction_id = ?
AND id = ?
UNION
SELECT parent_instruction_id
FROM k_sol_chain_instructions
WHERE transaction_id = ?
AND id = ?
AND parent_instruction_id IS NOT NULL
UNION
SELECT id
FROM k_sol_chain_instructions
WHERE transaction_id = ?
AND parent_instruction_id = ?
)
"#,
)
.bind(transaction_id)
.bind(audit_event_kind)
.bind(transaction_id)
.bind(instruction_id)
.bind(transaction_id)
.bind(instruction_id)
.bind(transaction_id)
.bind(instruction_id)
.execute(pool)
.await;
match query_result {
Ok(result) => return Ok(result.rows_affected()),
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot delete related instruction audit events on sqlite: {}",
error
)));
},
}
},
}
}
/// Deletes Meteora DLMM Anchor self-CPI swap audit rows already covered by decoded swaps.
///
/// This targets only local-corpus-observed Anchor event discriminators that are
/// decoded into `meteora_dlmm.swap` payload enrichment. It does not delete
/// unrelated DLMM instruction audits.
pub async fn query_dex_decoded_events_delete_meteora_dlmm_anchor_swap_instruction_audits(
database: &crate::Database,
transaction_id: i64,
) -> Result<u64, crate::Error> {
match database.connection() {
crate::DatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query(
r#"
DELETE FROM k_sol_dex_decoded_events
WHERE transaction_id = ?
AND protocol_name = 'meteora_dlmm'
AND event_kind = 'meteora_dlmm.instruction_audit'
AND json_extract(payload_json, '$.anchorSelfCpiLog') = 1
AND json_extract(payload_json, '$.anchorEventDiscriminatorHex') IN (
'516ce3becdd00ac4',
'2e7452d7941b544d'
)
"#,
)
.bind(transaction_id)
.execute(pool)
.await;
match query_result {
Ok(result) => return Ok(result.rows_affected()),
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot delete Meteora DLMM Anchor swap audit events on sqlite: {}",
error
)));
},
}
},
}
}
/// Reads one decoded DEX event by its natural key.
pub async fn query_dex_decoded_events_get_by_key(
database: &crate::Database,
@@ -400,4 +503,106 @@ mod tests {
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].event_kind, "raydium_amm_v4.initialize2_pool");
}
#[tokio::test]
async fn related_instruction_audit_delete_removes_parent_audit_for_child_decode() {
let database = make_database().await;
let transaction_dto = crate::ChainTransactionDto::new(
"sig-dex-event-related-audit-test-1".to_string(),
None,
None,
Some("helius_primary_http".to_string()),
Some("0".to_string()),
None,
None,
r#"{"transaction":{"message":{"instructions":[]}}}"#.to_string(),
);
let transaction_id_result =
crate::query_chain_transactions_upsert(&database, &transaction_dto).await;
let transaction_id = match transaction_id_result {
Ok(transaction_id) => transaction_id,
Err(error) => panic!("transaction upsert must succeed: {}", error),
};
let parent_instruction_dto = crate::ChainInstructionDto::new(
transaction_id,
None,
0,
None,
Some(crate::METEORA_DLMM_PROGRAM_ID.to_string()),
Some("meteora-dlmm".to_string()),
Some(1),
r#"["ParentAccount","Pool111"]"#.to_string(),
None,
None,
None,
);
let parent_instruction_id_result =
crate::query_chain_instructions_insert(&database, &parent_instruction_dto).await;
let parent_instruction_id = match parent_instruction_id_result {
Ok(parent_instruction_id) => parent_instruction_id,
Err(error) => panic!("parent instruction insert must succeed: {}", error),
};
let child_instruction_dto = crate::ChainInstructionDto::new(
transaction_id,
Some(parent_instruction_id),
0,
Some(0),
Some(crate::METEORA_DLMM_PROGRAM_ID.to_string()),
Some("meteora-dlmm".to_string()),
Some(2),
r#"["ChildAccount","Pool111"]"#.to_string(),
None,
None,
None,
);
let child_instruction_id_result =
crate::query_chain_instructions_insert(&database, &child_instruction_dto).await;
let child_instruction_id = match child_instruction_id_result {
Ok(child_instruction_id) => child_instruction_id,
Err(error) => panic!("child instruction insert must succeed: {}", error),
};
let audit_dto = crate::DexDecodedEventDto::new(
transaction_id,
Some(parent_instruction_id),
"meteora_dlmm".to_string(),
crate::METEORA_DLMM_PROGRAM_ID.to_string(),
"meteora_dlmm.instruction_audit".to_string(),
Some("Pool111".to_string()),
None,
None,
None,
None,
r#"{"audit":true}"#.to_string(),
);
let audit_upsert_result =
crate::query_dex_decoded_events_upsert(&database, &audit_dto).await;
match audit_upsert_result {
Ok(_) => {},
Err(error) => panic!("audit upsert must succeed: {}", error),
}
let deleted_result = super::query_dex_decoded_events_delete_related_instruction_audit(
&database,
transaction_id,
child_instruction_id,
"meteora_dlmm.instruction_audit",
)
.await;
let deleted = match deleted_result {
Ok(deleted) => deleted,
Err(error) => panic!("related audit delete must succeed: {}", error),
};
assert_eq!(deleted, 1);
let fetched_result = crate::query_dex_decoded_events_get_by_key(
&database,
transaction_id,
Some(parent_instruction_id),
"meteora_dlmm.instruction_audit",
)
.await;
let fetched = match fetched_result {
Ok(fetched) => fetched,
Err(error) => panic!("audit fetch must succeed: {}", error),
};
assert!(fetched.is_none());
}
}

View File

@@ -38,8 +38,10 @@ pub use meteora_dbc::MeteoraDbcSwapDecoded;
pub use meteora_dlmm::MeteoraDlmmCreatePoolDecoded;
pub use meteora_dlmm::MeteoraDlmmDecodedEvent;
pub use meteora_dlmm::MeteoraDlmmDecoder;
pub use meteora_dlmm::MeteoraDlmmFeeDecoded;
pub use meteora_dlmm::MeteoraDlmmLiquidityDecoded;
pub use meteora_dlmm::MeteoraDlmmPoolLifecycleDecoded;
pub use meteora_dlmm::MeteoraDlmmRewardDecoded;
pub use meteora_dlmm::MeteoraDlmmSwapDecoded;
pub use orca_whirlpools::OrcaWhirlpoolsCreatePoolDecoded;
pub use orca_whirlpools::OrcaWhirlpoolsDecodedEvent;

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,8 @@
//! Persistence-oriented DEX decoding service.
const METEORA_ANCHOR_SELF_CPI_LOG_SELECTOR_HEX: &str = "e445a52e51cb9a1d";
/// DEX decode service.
#[derive(Debug, Clone)]
pub struct DexDecodeService {
@@ -206,7 +208,7 @@ impl DexDecodeService {
Err(error) => return Err(error),
};
let cleanup_result = self
.delete_replaced_raydium_instruction_audit(
.delete_replaced_instruction_audit(
transaction_id,
instruction_id,
protocol_name,
@@ -219,7 +221,7 @@ impl DexDecodeService {
return Ok(materialized);
}
async fn delete_replaced_raydium_instruction_audit(
async fn delete_replaced_instruction_audit(
&self,
transaction_id: i64,
instruction_id: i64,
@@ -229,15 +231,14 @@ impl DexDecodeService {
if event_kind.ends_with(".instruction_audit") {
return Ok(());
}
let audit_event_kind = match raydium_instruction_audit_event_kind_by_protocol(protocol_name)
{
let audit_event_kind = match instruction_audit_event_kind_by_protocol(protocol_name) {
Some(audit_event_kind) => audit_event_kind,
None => return Ok(()),
};
let delete_result = crate::query_dex_decoded_events_delete_by_key(
let delete_result = crate::query_dex_decoded_events_delete_related_instruction_audit(
self.database.as_ref(),
transaction_id,
Some(instruction_id),
instruction_id,
audit_event_kind,
)
.await;
@@ -505,6 +506,42 @@ impl DexDecodeService {
)
.await;
},
crate::MeteoraDlmmDecodedEvent::Fee(event) => {
return self
.materialize_named_dex_event(
transaction,
event.transaction_id,
event.instruction_id,
"meteora_dlmm",
event.program_id.clone(),
event.event_kind.as_str(),
event.pool_account.clone(),
None,
None,
None,
None,
event.payload_json.clone(),
)
.await;
},
crate::MeteoraDlmmDecodedEvent::Reward(event) => {
return self
.materialize_named_dex_event(
transaction,
event.transaction_id,
event.instruction_id,
"meteora_dlmm",
event.program_id.clone(),
event.event_kind.as_str(),
event.pool_account.clone(),
None,
None,
None,
None,
event.payload_json.clone(),
)
.await;
},
}
}
@@ -1167,6 +1204,13 @@ impl DexDecodeService {
if decoded_instruction_ids.contains(&instruction_id) {
continue;
}
if is_meteora_dlmm_anchor_swap_log_replaced_by_decoded_swap(
audit_spec.protocol_name,
instruction,
decoded_events.as_slice(),
) {
continue;
}
let accounts = parse_instruction_accounts_vec(instruction.accounts_json.as_str());
let payload = build_meteora_instruction_audit_payload(
transaction,
@@ -1773,6 +1817,35 @@ fn candidate_meteora_audit_pool_account(
return accounts.get(index).cloned();
}
fn is_meteora_dlmm_anchor_swap_log_replaced_by_decoded_swap(
protocol_name: &str,
instruction: &crate::ChainInstructionDto,
decoded_events: &[crate::DexDecodedEventDto],
) -> bool {
if protocol_name != "meteora_dlmm" {
return false;
}
let data_base58 = parse_instruction_data_base58(instruction.data_json.as_deref());
let data_bytes = instruction_data_bytes_from_base58(data_base58.as_deref());
let selector_hex = discriminator_hex_from_bytes(data_bytes.as_deref(), 0);
if selector_hex.as_deref() != Some(METEORA_ANCHOR_SELF_CPI_LOG_SELECTOR_HEX) {
return false;
}
let event_discriminator_hex = discriminator_hex_from_bytes(data_bytes.as_deref(), 8);
match event_discriminator_hex.as_deref() {
Some("516ce3becdd00ac4") | Some("2e7452d7941b544d") => {},
_ => return false,
}
for decoded_event in decoded_events {
if decoded_event.protocol_name == "meteora_dlmm"
&& decoded_event.event_kind == "meteora_dlmm.swap"
{
return true;
}
}
return false;
}
fn build_meteora_instruction_audit_payload(
transaction: &crate::ChainTransactionDto,
instruction: &crate::ChainInstructionDto,
@@ -1786,10 +1859,36 @@ fn build_meteora_instruction_audit_payload(
None => 0,
};
let data_base58 = parse_instruction_data_base58(instruction.data_json.as_deref());
let discriminator_hex = discriminator_hex_from_base58(data_base58.as_deref());
let data_bytes = instruction_data_bytes_from_base58(data_base58.as_deref());
let discriminator_hex = discriminator_hex_from_bytes(data_bytes.as_deref(), 0);
let anchor_self_cpi_log =
discriminator_hex.as_deref() == Some(METEORA_ANCHOR_SELF_CPI_LOG_SELECTOR_HEX);
let anchor_event_discriminator_hex = if anchor_self_cpi_log {
discriminator_hex_from_bytes(data_bytes.as_deref(), 8)
} else {
None
};
let anchor_event_payload_size = if anchor_self_cpi_log {
match data_bytes.as_ref() {
Some(data_bytes) => data_bytes.len().checked_sub(8),
None => None,
}
} else {
None
};
let data_prefix = data_base58
.as_ref()
.map(|value| return value.chars().take(16).collect::<std::string::String>());
let audit_reason = if anchor_self_cpi_log {
"meteora_anchor_self_cpi_log_not_decoded_by_specific_event_decoder"
} else {
"meteora_instruction_not_decoded_by_specific_decoder"
};
let proof_status = if anchor_self_cpi_log {
"observed_local_corpus_anchor_self_cpi_log"
} else {
"unclassified_local_corpus_instruction"
};
return serde_json::json!({
"decoder": protocol_name,
"eventKind": event_kind,
@@ -1806,8 +1905,12 @@ fn build_meteora_instruction_audit_payload(
"data": data_base58,
"dataPrefix": data_prefix,
"discriminatorHex": discriminator_hex,
"auditReason": "meteora_instruction_not_decoded_by_specific_decoder",
"proofStatus": "unclassified_local_corpus_instruction",
"anchorSelfCpiLog": anchor_self_cpi_log,
"anchorSelfCpiLogSelectorHex": if anchor_self_cpi_log { Some(METEORA_ANCHOR_SELF_CPI_LOG_SELECTOR_HEX) } else { None },
"anchorEventDiscriminatorHex": anchor_event_discriminator_hex,
"anchorEventPayloadSize": anchor_event_payload_size,
"auditReason": audit_reason,
"proofStatus": proof_status,
"tradeCandidate": false,
"candleCandidate": false,
"nonTradeUseful": false,
@@ -1816,13 +1919,14 @@ fn build_meteora_instruction_audit_payload(
});
}
fn raydium_instruction_audit_event_kind_by_protocol(
fn instruction_audit_event_kind_by_protocol(
protocol_name: &str,
) -> std::option::Option<&'static str> {
match protocol_name {
"raydium_amm_v4" => return Some("raydium_amm_v4.instruction_audit"),
"raydium_clmm" => return Some("raydium_clmm.instruction_audit"),
"raydium_cpmm" => return Some("raydium_cpmm.instruction_audit"),
"meteora_dlmm" => return Some("meteora_dlmm.instruction_audit"),
_ => return None,
}
}
@@ -1932,21 +2036,28 @@ fn parse_instruction_data_base58(
fn discriminator_hex_from_base58(
data_base58: std::option::Option<&str>,
) -> std::option::Option<std::string::String> {
let data_base58 = match data_base58 {
Some(data_base58) => data_base58,
let bytes = instruction_data_bytes_from_base58(data_base58);
return discriminator_hex_from_bytes(bytes.as_deref(), 0);
}
fn discriminator_hex_from_bytes(
bytes: std::option::Option<&[u8]>,
offset: usize,
) -> std::option::Option<std::string::String> {
let bytes = match bytes {
Some(bytes) => bytes,
None => return None,
};
let bytes_result = bs58::decode(data_base58).into_vec();
let bytes = match bytes_result {
Ok(bytes) => bytes,
Err(_) => return None,
};
if bytes.len() < 8 {
if bytes.len() < offset + 8 {
return None;
}
let mut text = std::string::String::new();
for byte in bytes.iter().take(8) {
let mut index = offset;
let end = offset + 8;
while index < end {
let byte = bytes[index];
text.push_str(format!("{byte:02x}").as_str());
index += 1;
}
return Some(text);
}
@@ -3012,4 +3123,17 @@ mod tests {
};
assert_eq!(initialize.event_kind, "raydium_cpmm.initialize");
}
#[test]
fn maps_instruction_audit_event_kind_for_raydium_and_meteora_dlmm_protocols() {
assert_eq!(
super::instruction_audit_event_kind_by_protocol("raydium_clmm"),
Some("raydium_clmm.instruction_audit")
);
assert_eq!(
super::instruction_audit_event_kind_by_protocol("meteora_dlmm"),
Some("meteora_dlmm.instruction_audit")
);
assert_eq!(super::instruction_audit_event_kind_by_protocol("unknown"), None);
}
}

View File

@@ -335,6 +335,9 @@ pub fn is_dex_liquidity_event_kind(event_kind: &str) -> bool {
if event_kind.contains(".close_position") {
return true;
}
if event_kind.contains(".position_close") {
return true;
}
return false;
}
@@ -374,6 +377,9 @@ pub fn is_dex_position_open_event_kind(event_kind: &str) -> bool {
if event_kind.contains(".open_position") {
return true;
}
if event_kind.contains(".position_create") {
return true;
}
return false;
}
@@ -399,6 +405,9 @@ pub fn is_dex_fee_event_kind(event_kind: &str) -> bool {
if event_kind.contains("collect_fee") {
return true;
}
if event_kind.contains("claim_fee") {
return true;
}
return false;
}
@@ -1041,6 +1050,19 @@ mod tests {
assert!(!super::decoded_payload_has_trade_amount_or_price_payload(&empty_payload_json));
}
#[test]
fn classifies_dlmm_claim_fee2_as_fee_collection() {
assert_eq!(super::classify_dex_event_category_code("meteora_dlmm.claim_fee2"), "fee");
assert_eq!(
super::classify_dex_event_lifecycle_kind_code("meteora_dlmm.claim_fee2"),
"fee_collection"
);
assert_eq!(
super::classify_dex_event_actionability_code("meteora_dlmm.claim_fee2", false, false,),
"non_trade_useful"
);
}
#[test]
fn classifies_dlmm_add_remove_liquidity_and_positions_as_non_trade_useful() {
assert_eq!(

View File

@@ -343,14 +343,14 @@ pub use db::DbRuntimeEventDto;
pub use db::DbRuntimeEventEntity;
/// Runtime event level used by the local database layer.
pub use db::DbRuntimeEventLevel;
/// Application-facing decoded DEX event DTO.
pub use db::DexDecodedEventDto;
/// Application-facing DEX decode replay ledger DTO.
pub use db::DexDecodeReplayLedgerDto;
/// Persisted decoded DEX event row.
pub use db::DexDecodedEventEntity;
/// Persisted DEX decode replay ledger row.
pub use db::DexDecodeReplayLedgerEntity;
/// Application-facing decoded DEX event DTO.
pub use db::DexDecodedEventDto;
/// Persisted decoded DEX event row.
pub use db::DexDecodedEventEntity;
/// Application-facing normalized DEX DTO.
pub use db::DexDto;
/// Persisted normalized DEX row.
@@ -591,8 +591,18 @@ pub use db::query_db_metadatas_upsert;
pub use db::query_db_runtime_events_insert;
/// Lists recent runtime events ordered from newest to oldest.
pub use db::query_db_runtime_events_list_recent;
/// Reads one DEX decode replay ledger row by signature and decoder identity.
pub use db::query_dex_decode_replay_ledger_get_by_signature;
/// Reads one DEX decode replay ledger row by transaction and decoder identity.
pub use db::query_dex_decode_replay_ledger_get_by_transaction;
/// Inserts or updates one DEX decode replay ledger row.
pub use db::query_dex_decode_replay_ledger_upsert;
/// Deletes one decoded DEX event row by its natural key.
pub use db::query_dex_decoded_events_delete_by_key;
/// Deletes Meteora DLMM Anchor self-CPI swap audit rows already covered by decoded swaps.
pub use db::query_dex_decoded_events_delete_meteora_dlmm_anchor_swap_instruction_audits;
/// Deletes decoded DEX instruction audit rows related to one decoded instruction.
pub use db::query_dex_decoded_events_delete_related_instruction_audit;
/// Reads one decoded DEX event by its natural key.
pub use db::query_dex_decoded_events_get_by_key;
/// Returns the latest Pump.fun create payload associated with a token mint.
@@ -601,12 +611,6 @@ pub use db::query_dex_decoded_events_get_latest_pump_fun_create_payload_by_mint;
pub use db::query_dex_decoded_events_list_by_transaction_id;
/// Inserts or updates one decoded DEX event row.
pub use db::query_dex_decoded_events_upsert;
/// Reads one DEX decode replay ledger row by signature and decoder identity.
pub use db::query_dex_decode_replay_ledger_get_by_signature;
/// Reads one DEX decode replay ledger row by transaction and decoder identity.
pub use db::query_dex_decode_replay_ledger_get_by_transaction;
/// Inserts or updates one DEX decode replay ledger row.
pub use db::query_dex_decode_replay_ledger_upsert;
/// Reads one normalized DEX row by code.
pub use db::query_dexs_get_by_code;
/// Lists normalized DEX rows.
@@ -921,10 +925,14 @@ pub use dex::MeteoraDlmmCreatePoolDecoded;
pub use dex::MeteoraDlmmDecodedEvent;
/// Meteora DLMM decoder.
pub use dex::MeteoraDlmmDecoder;
/// Decoded Meteora DLMM fee collection event.
pub use dex::MeteoraDlmmFeeDecoded;
/// Decoded Meteora DLMM liquidity lifecycle event.
pub use dex::MeteoraDlmmLiquidityDecoded;
/// Decoded Meteora DLMM pool lifecycle event.
pub use dex::MeteoraDlmmPoolLifecycleDecoded;
/// Decoded Meteora DLMM reward or emission event.
pub use dex::MeteoraDlmmRewardDecoded;
/// Decoded Meteora DLMM swap event.
pub use dex::MeteoraDlmmSwapDecoded;
/// Decoded Orca Whirlpools create-pool event.

View File

@@ -7,7 +7,8 @@
//! deterministic local pipeline over their signatures.
const LOCAL_PIPELINE_DEX_DECODER_SCOPE: &str = "dex_decode.local_pipeline";
const LOCAL_PIPELINE_DEX_DECODER_VERSION: &str = "dex_decode.v0.7.44.ledger1";
const LOCAL_PIPELINE_DEX_DECODER_VERSION: &str =
"dex_decode.v0.7.45.dlmm_add_liquidity_strategies1";
fn default_skip_certified_dex_decode() -> bool {
return true;
@@ -193,9 +194,11 @@ impl LocalPipelineReplayService {
signature = %signature,
"replaying local pipeline for persisted transaction"
);
let transaction_result =
crate::query_chain_transactions_get_by_signature(self.database.as_ref(), signature.as_str())
.await;
let transaction_result = crate::query_chain_transactions_get_by_signature(
self.database.as_ref(),
signature.as_str(),
)
.await;
let transaction = match transaction_result {
Ok(Some(transaction)) => transaction,
Ok(None) => {
@@ -260,9 +263,8 @@ impl LocalPipelineReplayService {
);
},
None => {
let decode_result = dex_decode
.decode_transaction_by_signature(signature.as_str())
.await;
let decode_result =
dex_decode.decode_transaction_by_signature(signature.as_str()).await;
match decode_result {
Ok(decoded_events) => {
result.decoded_event_count += decoded_events.len();
@@ -542,11 +544,8 @@ impl LocalPipelineReplayService {
signature: &str,
decoded_events: &[crate::DexDecodedEventDto],
) -> Result<crate::DexDecodeReplayLedgerDto, crate::Error> {
let ledger_result = build_success_dex_decode_replay_ledger(
transaction_id,
signature,
decoded_events,
);
let ledger_result =
build_success_dex_decode_replay_ledger(transaction_id, signature, decoded_events);
let ledger = match ledger_result {
Ok(ledger) => ledger,
Err(error) => return Err(error),
@@ -597,10 +596,23 @@ fn build_success_dex_decode_replay_ledger(
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot convert decoded event count '{}' to i64: {}",
decoded_events.len(), error
decoded_events.len(),
error
)));
},
};
let effective_event_count_usize = count_effective_decoded_events(decoded_events);
let effective_event_count_result = i64::try_from(effective_event_count_usize);
let effective_event_count = match effective_event_count_result {
Ok(effective_event_count) => effective_event_count,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot convert effective decoded event count '{}' to i64: {}",
effective_event_count_usize, error
)));
},
};
let instruction_audit_count = event_count - effective_event_count;
let distinct_token_mint_count_usize = count_distinct_decoded_event_token_mints(decoded_events);
let distinct_token_mint_count_result = i64::try_from(distinct_token_mint_count_usize);
let distinct_token_mint_count = match distinct_token_mint_count_result {
@@ -612,7 +624,7 @@ fn build_success_dex_decode_replay_ledger(
)));
},
};
let force_replay_required = event_count > 1 || distinct_token_mint_count > 2;
let force_replay_required = effective_event_count > 1 || distinct_token_mint_count > 2;
let decode_status = if event_count == 0 {
crate::DexDecodeReplayLedgerDto::STATUS_NO_EVENTS.to_string()
} else {
@@ -625,6 +637,8 @@ fn build_success_dex_decode_replay_ledger(
};
let status_reason = build_dex_decode_replay_ledger_status_reason(
event_count,
effective_event_count,
instruction_audit_count,
distinct_token_mint_count,
force_replay_required,
);
@@ -642,9 +656,22 @@ fn build_success_dex_decode_replay_ledger(
));
}
fn count_distinct_decoded_event_token_mints(
decoded_events: &[crate::DexDecodedEventDto],
) -> usize {
fn count_effective_decoded_events(decoded_events: &[crate::DexDecodedEventDto]) -> usize {
let mut count = 0_usize;
for event in decoded_events {
if is_instruction_audit_event(event) {
continue;
}
count += 1;
}
return count;
}
fn is_instruction_audit_event(event: &crate::DexDecodedEventDto) -> bool {
return event.event_kind.ends_with(".instruction_audit");
}
fn count_distinct_decoded_event_token_mints(decoded_events: &[crate::DexDecodedEventDto]) -> usize {
let mut mints = std::collections::BTreeSet::<std::string::String>::new();
for event in decoded_events {
insert_optional_mint(&mut mints, &event.lp_mint);
@@ -668,6 +695,8 @@ fn insert_optional_mint(
fn build_dex_decode_replay_ledger_status_reason(
event_count: i64,
effective_event_count: i64,
instruction_audit_count: i64,
distinct_token_mint_count: i64,
force_replay_required: bool,
) -> std::string::String {
@@ -676,11 +705,11 @@ fn build_dex_decode_replay_ledger_status_reason(
}
if force_replay_required {
return format!(
"decode completed but remains unsafe for skip: event_count={event_count}, distinct_token_mint_count={distinct_token_mint_count}"
"decode completed but remains unsafe for skip: event_count={event_count}, effective_event_count={effective_event_count}, instruction_audit_count={instruction_audit_count}, distinct_token_mint_count={distinct_token_mint_count}"
);
}
return format!(
"decode completed and certified for skip: event_count={event_count}, distinct_token_mint_count={distinct_token_mint_count}"
"decode completed and certified for skip: event_count={event_count}, effective_event_count={effective_event_count}, instruction_audit_count={instruction_audit_count}, distinct_token_mint_count={distinct_token_mint_count}"
);
}
@@ -692,3 +721,57 @@ pub async fn replay_local_pipeline(
let service = crate::LocalPipelineReplayService::new(database);
return service.replay_local_pipeline(config).await;
}
#[cfg(test)]
mod tests {
fn make_decoded_event(
event_kind: &str,
token_a_mint: std::option::Option<&str>,
token_b_mint: std::option::Option<&str>,
) -> crate::DexDecodedEventDto {
return crate::DexDecodedEventDto::new(
1,
Some(10),
"meteora_dlmm".to_string(),
crate::METEORA_DLMM_PROGRAM_ID.to_string(),
event_kind.to_string(),
Some("pool".to_string()),
None,
token_a_mint.map(|value| return value.to_string()),
token_b_mint.map(|value| return value.to_string()),
None,
"{}".to_string(),
);
}
#[test]
fn ledger_certifies_one_effective_event_with_instruction_audits() {
let events = vec![
make_decoded_event("meteora_dlmm.swap", Some("mint-a"), Some("mint-b")),
make_decoded_event("meteora_dlmm.instruction_audit", None, None),
make_decoded_event("meteora_dlmm.instruction_audit", None, None),
];
let ledger = super::build_success_dex_decode_replay_ledger(1, "sig", events.as_slice())
.expect("ledger must build");
assert_eq!(ledger.event_count, 3);
assert_eq!(ledger.distinct_token_mint_count, 2);
assert!(!ledger.force_replay_required);
assert_eq!(ledger.certainty, crate::DexDecodeReplayLedgerDto::CERTAINTY_SURE);
assert!(ledger.can_skip_decode());
}
#[test]
fn ledger_keeps_multiple_effective_events_unsafe() {
let events = vec![
make_decoded_event("meteora_dlmm.swap", Some("mint-a"), Some("mint-b")),
make_decoded_event("meteora_dlmm.swap", Some("mint-a"), Some("mint-b")),
make_decoded_event("meteora_dlmm.instruction_audit", None, None),
];
let ledger = super::build_success_dex_decode_replay_ledger(1, "sig", events.as_slice())
.expect("ledger must build");
assert_eq!(ledger.event_count, 3);
assert!(ledger.force_replay_required);
assert_eq!(ledger.certainty, crate::DexDecodeReplayLedgerDto::CERTAINTY_UNSAFE);
assert!(!ledger.can_skip_decode());
}
}