0.7.48-pre

This commit is contained in:
2026-05-31 19:23:46 +02:00
parent 8b09e82b3b
commit abb810d544
20 changed files with 2864 additions and 348 deletions

View File

@@ -0,0 +1,473 @@
// file: kb_lib/src/dex_event_coverage.rs
//! Event coverage synchronization and reporting service.
//!
//! This service bridges the read-only upstream registry and the persisted
//! coverage table. It does not decode transactions and never materializes
//! trades, metrics or candles.
/// Result of one event coverage synchronization pass.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DexEventCoverageSyncResult {
/// Optional decoder filter used for this synchronization pass.
pub decoder_code: std::option::Option<std::string::String>,
/// Number of upstream registry entries selected by the filter.
pub upstream_entry_count: usize,
/// Number of coverage rows upserted from the upstream registry.
pub upserted_entry_count: usize,
/// Number of coverage rows touched by the local observation refresh.
pub refreshed_entry_count: u64,
/// Aggregated coverage summaries after synchronization.
pub summaries: std::vec::Vec<crate::DexEventCoverageSummaryDto>,
}
/// Service used to persist and refresh DEX event coverage rows.
#[derive(Debug, Clone)]
pub struct DexEventCoverageService {
database: std::sync::Arc<crate::Database>,
upstream_registry: crate::UpstreamRegistryService,
}
impl DexEventCoverageService {
/// Creates a new event coverage service.
pub fn new(database: std::sync::Arc<crate::Database>) -> Self {
return Self {
database,
upstream_registry: crate::UpstreamRegistryService::new(),
};
}
/// Synchronizes static upstream registry entries into SQLite coverage rows.
///
/// The resulting rows are still discovery/audit metadata. A row can become
/// observed or materialized only through local corpus replay and explicit
/// count refreshes.
pub async fn sync_upstream_registry(
&self,
decoder_code: std::option::Option<std::string::String>,
) -> Result<crate::DexEventCoverageSyncResult, crate::Error> {
let request = crate::UpstreamRegistrySearchRequestDto {
decoder_code: decoder_code.clone(),
program_id: None,
program_family: None,
surface_kind: None,
entry_kind: None,
proof_status: None,
limit: None,
};
let search_result = self.upstream_registry.search(&request);
let mut upserted_entry_count = 0_usize;
for entry in &search_result.entries {
let coverage_entry = build_coverage_entry_from_upstream(entry);
let upsert_result = crate::query_dex_event_coverage_entries_upsert(
self.database.as_ref(),
&coverage_entry,
)
.await;
match upsert_result {
Ok(_) => upserted_entry_count += 1,
Err(error) => return Err(error),
}
}
let refreshed_entry_count = match &decoder_code {
Some(decoder_code) => {
let refresh_result =
crate::query_dex_event_coverage_entries_refresh_local_counts_by_decoder(
self.database.as_ref(),
decoder_code.as_str(),
)
.await;
match refresh_result {
Ok(refreshed_entry_count) => refreshed_entry_count,
Err(error) => return Err(error),
}
},
None => {
let refresh_result = crate::query_dex_event_coverage_entries_refresh_local_counts(
self.database.as_ref(),
)
.await;
match refresh_result {
Ok(refreshed_entry_count) => refreshed_entry_count,
Err(error) => return Err(error),
}
},
};
let summaries_result =
crate::query_dex_event_coverage_entries_list_summary_by_decoder(self.database.as_ref())
.await;
let summaries = match summaries_result {
Ok(summaries) => summaries,
Err(error) => return Err(error),
};
return Ok(crate::DexEventCoverageSyncResult {
decoder_code,
upstream_entry_count: search_result.entries.len(),
upserted_entry_count,
refreshed_entry_count,
summaries,
});
}
/// Refreshes observed, materialized and proof-status counters from local DB rows.
pub async fn refresh_local_counts(
&self,
decoder_code: std::option::Option<std::string::String>,
) -> Result<crate::DexEventCoverageSyncResult, crate::Error> {
let refreshed_entry_count = match &decoder_code {
Some(decoder_code) => {
let refresh_result =
crate::query_dex_event_coverage_entries_refresh_local_counts_by_decoder(
self.database.as_ref(),
decoder_code.as_str(),
)
.await;
match refresh_result {
Ok(refreshed_entry_count) => refreshed_entry_count,
Err(error) => return Err(error),
}
},
None => {
let refresh_result = crate::query_dex_event_coverage_entries_refresh_local_counts(
self.database.as_ref(),
)
.await;
match refresh_result {
Ok(refreshed_entry_count) => refreshed_entry_count,
Err(error) => return Err(error),
}
},
};
let summaries_result =
crate::query_dex_event_coverage_entries_list_summary_by_decoder(self.database.as_ref())
.await;
let summaries = match summaries_result {
Ok(summaries) => summaries,
Err(error) => return Err(error),
};
return Ok(crate::DexEventCoverageSyncResult {
decoder_code,
upstream_entry_count: 0,
upserted_entry_count: 0,
refreshed_entry_count,
summaries,
});
}
}
fn build_coverage_entry_from_upstream(
entry: &crate::UpstreamRegistryEntryDto,
) -> crate::DexEventCoverageEntryDto {
let event_family = infer_event_family(entry.entry_name.as_str(), entry.entry_kind.as_str());
let expected_db_target =
infer_expected_db_target(event_family.as_deref(), entry.entry_kind.as_str());
let local_event_kind =
known_local_event_kind(entry.decoder_code.as_str(), entry.entry_name.as_str());
let mut coverage_entry = crate::DexEventCoverageEntryDto::from_upstream_registry_entry(
entry,
event_family,
expected_db_target,
local_event_kind.clone(),
);
if local_event_kind.is_some() && coverage_entry.observed_count == 0 {
coverage_entry.proof_status =
crate::PROOF_STATUS_UPSTREAM_GIT_MAPPED_UNVERIFIED.to_string();
}
return coverage_entry;
}
fn infer_expected_db_target(
event_family: std::option::Option<&str>,
entry_kind: &str,
) -> std::option::Option<std::string::String> {
if entry_kind == crate::ENTRY_KIND_PROGRAM || entry_kind == crate::ENTRY_KIND_ACCOUNT {
return Some(crate::DexEventCoverageEntryDto::DB_TARGET_DECODED_EVENTS_ONLY.to_string());
}
let family = match event_family {
Some(family) => family,
None => {
return Some(
crate::DexEventCoverageEntryDto::DB_TARGET_DECODED_EVENTS_ONLY.to_string(),
);
},
};
let target = match family {
"swap" => crate::DexEventCoverageEntryDto::DB_TARGET_TRADE_EVENTS,
"pool_create" => crate::DexEventCoverageEntryDto::DB_TARGET_POOL_LIFECYCLE_EVENTS,
"liquidity_add" => crate::DexEventCoverageEntryDto::DB_TARGET_LIQUIDITY_EVENTS,
"liquidity_remove" => crate::DexEventCoverageEntryDto::DB_TARGET_LIQUIDITY_EVENTS,
"position_open" => crate::DexEventCoverageEntryDto::DB_TARGET_POOL_LIFECYCLE_EVENTS,
"position_close" => crate::DexEventCoverageEntryDto::DB_TARGET_POOL_LIFECYCLE_EVENTS,
"fee" => crate::DexEventCoverageEntryDto::DB_TARGET_FEE_EVENTS,
"reward" => crate::DexEventCoverageEntryDto::DB_TARGET_REWARD_EVENTS,
"admin_config" => crate::DexEventCoverageEntryDto::DB_TARGET_POOL_ADMIN_EVENTS,
"mint" => crate::DexEventCoverageEntryDto::DB_TARGET_TOKEN_MINT_EVENTS,
"burn" => crate::DexEventCoverageEntryDto::DB_TARGET_TOKEN_BURN_EVENTS,
"transfer" => crate::DexEventCoverageEntryDto::DB_TARGET_TOKEN_TRANSFER_EVENTS,
"account_create" => crate::DexEventCoverageEntryDto::DB_TARGET_TOKEN_ACCOUNT_EVENTS,
"account_close" => crate::DexEventCoverageEntryDto::DB_TARGET_TOKEN_ACCOUNT_EVENTS,
"wrap_sol" => crate::DexEventCoverageEntryDto::DB_TARGET_TOKEN_ACCOUNT_EVENTS,
"unwrap_sol" => crate::DexEventCoverageEntryDto::DB_TARGET_TOKEN_ACCOUNT_EVENTS,
"order_place" => crate::DexEventCoverageEntryDto::DB_TARGET_ORDERBOOK_EVENTS,
"order_cancel" => crate::DexEventCoverageEntryDto::DB_TARGET_ORDERBOOK_EVENTS,
"order_fill" => crate::DexEventCoverageEntryDto::DB_TARGET_ORDERBOOK_EVENTS,
"consume_events" => crate::DexEventCoverageEntryDto::DB_TARGET_ORDERBOOK_EVENTS,
"settle_funds" => crate::DexEventCoverageEntryDto::DB_TARGET_ORDERBOOK_EVENTS,
"vault_deposit" => crate::DexEventCoverageEntryDto::DB_TARGET_VAULT_EVENTS,
"vault_withdraw" => crate::DexEventCoverageEntryDto::DB_TARGET_VAULT_EVENTS,
"lock" => crate::DexEventCoverageEntryDto::DB_TARGET_LIQUIDITY_LOCK_EVENTS,
"unlock" => crate::DexEventCoverageEntryDto::DB_TARGET_LIQUIDITY_LOCK_EVENTS,
"launch" => crate::DexEventCoverageEntryDto::DB_TARGET_LAUNCH_EVENTS,
"migration" => crate::DexEventCoverageEntryDto::DB_TARGET_LAUNCH_EVENTS,
"stake" => crate::DexEventCoverageEntryDto::DB_TARGET_DECODED_EVENTS_ONLY,
"unstake" => crate::DexEventCoverageEntryDto::DB_TARGET_DECODED_EVENTS_ONLY,
_ => crate::DexEventCoverageEntryDto::DB_TARGET_DECODED_EVENTS_ONLY,
};
return Some(target.to_string());
}
fn infer_event_family(
entry_name: &str,
entry_kind: &str,
) -> std::option::Option<std::string::String> {
if entry_kind == crate::ENTRY_KIND_PROGRAM {
return None;
}
let normalized = entry_name.to_ascii_lowercase();
if contains_any(normalized.as_str(), &["swap", "buy", "sell", "trade"]) {
return Some("swap".to_string());
}
if contains_any(normalized.as_str(), &["create_pool", "initialize_pool", "initialize2"])
|| normalized == "initialize"
|| normalized.starts_with("initialize_")
{
return Some("pool_create".to_string());
}
if contains_any(normalized.as_str(), &["add_liquidity", "increase_liquidity", "deposit"])
|| normalized.contains("bootstrap_liquidity")
{
return Some("liquidity_add".to_string());
}
if contains_any(normalized.as_str(), &["remove_liquidity", "decrease_liquidity", "withdraw"])
&& !normalized.contains("funds")
{
return Some("liquidity_remove".to_string());
}
if contains_any(
normalized.as_str(),
&["open_position", "initialize_position", "position_create"],
) {
return Some("position_open".to_string());
}
if contains_any(normalized.as_str(), &["close_position", "position_close"])
|| normalized.contains("close_position_if_empty")
{
return Some("position_close".to_string());
}
if contains_any(normalized.as_str(), &["fee", "collect", "claim_fee"])
&& !normalized.contains("reward")
{
return Some("fee".to_string());
}
if normalized.contains("reward") {
return Some("reward".to_string());
}
if contains_any(
normalized.as_str(),
&["config", "admin", "authority", "permission", "pause", "status", "update_pool"],
) {
return Some("admin_config".to_string());
}
if normalized.contains("mint") {
return Some("mint".to_string());
}
if normalized.contains("burn") {
return Some("burn".to_string());
}
if normalized.contains("transfer") {
return Some("transfer".to_string());
}
if contains_any(normalized.as_str(), &["create_ata", "init_account", "open_orders_create"]) {
return Some("account_create".to_string());
}
if contains_any(normalized.as_str(), &["close_account", "close_open_orders"])
|| normalized.starts_with("close_")
{
return Some("account_close".to_string());
}
if normalized.contains("wrap_sol") {
return Some("wrap_sol".to_string());
}
if normalized.contains("unwrap_sol") {
return Some("unwrap_sol".to_string());
}
if normalized.contains("place_order") || normalized.contains("post_order") {
return Some("order_place".to_string());
}
if normalized.contains("cancel_order") || normalized.contains("cancel_all") {
return Some("order_cancel".to_string());
}
if normalized.contains("fill") {
return Some("order_fill".to_string());
}
if normalized.contains("consume_events") {
return Some("consume_events".to_string());
}
if normalized.contains("settle_funds") {
return Some("settle_funds".to_string());
}
if normalized.contains("vault") && normalized.contains("deposit") {
return Some("vault_deposit".to_string());
}
if normalized.contains("vault") && normalized.contains("withdraw") {
return Some("vault_withdraw".to_string());
}
if contains_any(normalized.as_str(), &["lock_liquidity", "create_lock", "lock"])
&& !normalized.contains("unlock")
{
return Some("lock".to_string());
}
if normalized.contains("unlock") {
return Some("unlock".to_string());
}
if contains_any(normalized.as_str(), &["launch", "create_bonding", "bonding_curve"]) {
return Some("launch".to_string());
}
if contains_any(normalized.as_str(), &["migrate", "migration", "graduate"]) {
return Some("migration".to_string());
}
if normalized.contains("unstake") {
return Some("unstake".to_string());
}
if normalized.contains("stake") {
return Some("stake".to_string());
}
return Some("unknown".to_string());
}
fn contains_any(value: &str, needles: &[&str]) -> bool {
for needle in needles {
if value.contains(needle) {
return true;
}
}
return false;
}
fn known_local_event_kind(
decoder_code: &str,
entry_name: &str,
) -> std::option::Option<std::string::String> {
match (decoder_code, entry_name) {
("raydium-cpmm", "swap_base_input") => {
return Some("raydium_cpmm.swap_base_input".to_string());
},
("raydium-cpmm", "swap_base_output") => {
return Some("raydium_cpmm.swap_base_output".to_string());
},
("raydium-cpmm", "collect_creator_fee") => {
return Some("raydium_cpmm.collect_creator_fee".to_string());
},
("raydium-cpmm", "withdraw") => return Some("raydium_cpmm.withdraw".to_string()),
("raydium-cpmm", "initialize") => return Some("raydium_cpmm.initialize".to_string()),
("raydium-clmm", "swap") => return Some("raydium_clmm.swap".to_string()),
("raydium-clmm", "swap_v2") => return Some("raydium_clmm.swap_v2".to_string()),
("raydium-clmm", "increase_liquidity_v2") => {
return Some("raydium_clmm.increase_liquidity_v2".to_string());
},
("raydium-clmm", "decrease_liquidity_v2") => {
return Some("raydium_clmm.decrease_liquidity_v2".to_string());
},
("raydium-clmm", "open_position_with_token22_nft") => {
return Some("raydium_clmm.open_position_with_token22_nft".to_string());
},
("raydium-clmm", "close_position") => {
return Some("raydium_clmm.close_position".to_string());
},
_ => return None,
}
}
#[cfg(test)]
mod tests {
async fn make_database() -> std::sync::Arc<crate::Database> {
let tempdir_result = tempfile::tempdir();
let tempdir = match tempdir_result {
Ok(tempdir) => tempdir,
Err(error) => panic!("tempdir must succeed: {}", error),
};
let database_path = tempdir.path().join("dex_event_coverage.sqlite3");
let config = crate::DatabaseConfig {
enabled: true,
backend: crate::DatabaseBackend::Sqlite,
sqlite: crate::SqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
let database_result = crate::Database::connect_and_initialize(&config).await;
let database = match database_result {
Ok(database) => database,
Err(error) => panic!("database init must succeed: {}", error),
};
return std::sync::Arc::new(database);
}
#[test]
fn event_family_inference_covers_raydium_cpmm_core_entries() {
assert_eq!(
super::infer_event_family("swap_base_input", crate::ENTRY_KIND_INSTRUCTION),
Some("swap".to_string())
);
assert_eq!(
super::infer_event_family("initialize", crate::ENTRY_KIND_INSTRUCTION),
Some("pool_create".to_string())
);
assert_eq!(
super::infer_event_family("withdraw", crate::ENTRY_KIND_INSTRUCTION),
Some("liquidity_remove".to_string())
);
assert_eq!(
super::infer_event_family("collect_creator_fee", crate::ENTRY_KIND_INSTRUCTION),
Some("fee".to_string())
);
}
#[tokio::test]
async fn sync_upstream_registry_persists_raydium_cpmm_coverage_rows() {
let database = make_database().await;
let service = crate::DexEventCoverageService::new(database.clone());
let result = service.sync_upstream_registry(Some("raydium-cpmm".to_string())).await;
let result = match result {
Ok(result) => result,
Err(error) => panic!("coverage sync must succeed: {}", error),
};
assert!(result.upstream_entry_count > 0);
assert_eq!(result.upstream_entry_count, result.upserted_entry_count);
let rows_result = crate::query_dex_event_coverage_entries_list_by_decoder(
database.as_ref(),
"raydium-cpmm",
)
.await;
let rows = match rows_result {
Ok(rows) => rows,
Err(error) => panic!("coverage rows must load: {}", error),
};
assert_eq!(rows.len(), result.upstream_entry_count);
assert!(rows.iter().any(|row| return {
row.entry_name == "swap_base_input"
&& row.event_family == Some("swap".to_string())
&& row.local_event_kind == Some("raydium_cpmm.swap_base_input".to_string())
}));
assert!(rows.iter().any(|row| return {
row.entry_name == "deposit"
&& row.event_family == Some("liquidity_add".to_string())
&& row.local_event_kind.is_none()
}));
}
}