This commit is contained in:
2026-06-05 14:53:16 +02:00
parent 27e25d5bf4
commit f81e0f3bea
66 changed files with 7655 additions and 214 deletions

View File

@@ -13,6 +13,10 @@ fn default_skip_certified_dex_decode() -> bool {
return true;
}
fn default_defer_instruction_observation_index_refresh() -> bool {
return true;
}
/// Configuration for a local pipeline replay pass.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -31,6 +35,9 @@ pub struct LocalPipelineReplayConfig {
/// Whether DEX decoding must run even when the replay ledger certifies a safe prior pass.
#[serde(default)]
pub force_decode_replay: bool,
/// Whether instruction observation indexing is deferred and refreshed once after replay.
#[serde(default = "default_defer_instruction_observation_index_refresh")]
pub defer_instruction_observation_index_refresh: bool,
}
impl Default for LocalPipelineReplayConfig {
@@ -42,6 +49,7 @@ impl Default for LocalPipelineReplayConfig {
reset_market_materialization_before_replay: true,
skip_certified_dex_decode: true,
force_decode_replay: false,
defer_instruction_observation_index_refresh: true,
};
}
}
@@ -90,6 +98,8 @@ pub struct LocalPipelineReplayResult {
pub reward_event_count: usize,
/// Total pool administration event materialization results returned by replayed non-trade calls.
pub pool_admin_event_count: usize,
/// Total orderbook event materialization results returned by replayed non-trade calls.
pub orderbook_event_count: usize,
/// Total candle upsert results returned by replayed candle calls.
///
/// This is a replay write/result counter, not the number of distinct rows
@@ -111,6 +121,10 @@ pub struct LocalPipelineReplayResult {
pub pair_symbol_updated_count: usize,
/// Number of derived market materialization rows deleted before replay.
pub reset_market_materialization_deleted_count: u64,
/// Total instruction source rows scanned by the observation index refresh.
pub instruction_observation_scanned_count: usize,
/// Total instruction-observation rows upserted by the observation index refresh.
pub instruction_observation_upserted_count: usize,
/// Number of errors outside per-signature replay.
pub global_error_count: usize,
}
@@ -352,6 +366,7 @@ impl LocalPipelineReplayService {
result.fee_event_count += non_trade_result.fee_event_count;
result.reward_event_count += non_trade_result.reward_event_count;
result.pool_admin_event_count += non_trade_result.pool_admin_event_count;
result.orderbook_event_count += non_trade_result.orderbook_event_count;
},
Err(error) => {
result.non_trade_materialization_error_count += 1;
@@ -426,25 +441,55 @@ impl LocalPipelineReplayService {
);
},
}
if !config.defer_instruction_observation_index_refresh {
let instruction_index_result =
instruction_observation_index.refresh_signature(signature.as_str()).await;
match instruction_index_result {
Ok(index_result) => {
result.instruction_observation_scanned_count +=
index_result.scanned_instruction_count;
result.instruction_observation_upserted_count +=
index_result.upserted_observation_count;
tracing::debug!(
signature = %signature,
upserted_observation_count = index_result.upserted_observation_count,
"instruction observation index refreshed during local replay"
);
},
Err(error) => {
tracing::warn!(
signature = %signature,
error = %error,
"instruction observation index refresh failed during local replay"
);
},
}
}
result.replayed_transaction_count += 1;
}
if config.defer_instruction_observation_index_refresh {
let instruction_index_result =
instruction_observation_index.refresh_signature(signature.as_str()).await;
instruction_observation_index.refresh_replay_window(config.limit).await;
match instruction_index_result {
Ok(index_result) => {
result.instruction_observation_scanned_count +=
index_result.scanned_instruction_count;
result.instruction_observation_upserted_count +=
index_result.upserted_observation_count;
tracing::debug!(
signature = %signature,
scanned_instruction_count = index_result.scanned_instruction_count,
upserted_observation_count = index_result.upserted_observation_count,
"instruction observation index refreshed during local replay"
"instruction observation index refreshed after local replay"
);
},
Err(error) => {
result.global_error_count += 1;
tracing::warn!(
signature = %signature,
error = %error,
"instruction observation index refresh failed during local replay"
"instruction observation index refresh failed after local replay"
);
},
}
result.replayed_transaction_count += 1;
}
if config.refresh_missing_token_metadata {
let metadata_service = match &self.http_pool {
@@ -476,6 +521,52 @@ impl LocalPipelineReplayService {
}
async fn refresh_event_coverage_best_effort(&self) {
let cleanup_result =
crate::query_dex_decoded_events_delete_replaced_raydium_clmm_instruction_audits(
self.database.as_ref(),
None,
)
.await;
match cleanup_result {
Ok(deleted_count) => {
if deleted_count > 0 {
tracing::info!(
deleted_count = deleted_count,
"replaced Raydium CLMM instruction audits cleaned before dex event coverage refresh"
);
}
},
Err(error) => {
tracing::warn!(
error = %error,
"Raydium CLMM replaced instruction-audit cleanup failed before dex event coverage refresh"
);
},
}
let upstream_cleanup_result =
crate::query_dex_decoded_events_delete_locally_covered_upstream_instruction_matches(
self.database.as_ref(),
None,
)
.await;
match upstream_cleanup_result {
Ok(deleted_count) => {
if deleted_count > 0 {
tracing::info!(
deleted_count = deleted_count,
"locally covered upstream instruction matches cleaned before dex event coverage refresh"
);
}
},
Err(error) => {
tracing::warn!(
error = %error,
"locally covered upstream instruction-match cleanup failed before dex event coverage refresh"
);
},
}
let coverage_service = crate::DexEventCoverageService::new(self.database.clone());
let refresh_result = coverage_service.refresh_local_counts(None).await;
match refresh_result {
@@ -494,6 +585,46 @@ impl LocalPipelineReplayService {
);
},
}
let post_refresh_upstream_cleanup_result =
crate::query_dex_decoded_events_delete_locally_covered_upstream_instruction_matches(
self.database.as_ref(),
None,
)
.await;
match post_refresh_upstream_cleanup_result {
Ok(deleted_count) => {
if deleted_count > 0 {
tracing::info!(
deleted_count = deleted_count,
"locally covered upstream instruction matches cleaned after dex event coverage refresh"
);
let second_refresh_result = coverage_service.refresh_local_counts(None).await;
match second_refresh_result {
Ok(second_refresh_result) => {
tracing::debug!(
upserted_entry_count = second_refresh_result.upserted_entry_count,
refreshed_entry_count = second_refresh_result.refreshed_entry_count,
summary_count = second_refresh_result.summaries.len(),
"dex event coverage refreshed after upstream instruction-match cleanup"
);
},
Err(error) => {
tracing::warn!(
error = %error,
"dex event coverage refresh failed after upstream instruction-match cleanup"
);
},
}
}
},
Err(error) => {
tracing::warn!(
error = %error,
"locally covered upstream instruction-match cleanup failed after dex event coverage refresh"
);
},
}
}
async fn get_certified_dex_decode_skip_ledger(