Files
khadhroony-bobobot/kb_lib/src/local_pipeline_replay.rs
2026-06-09 10:13:03 +02:00

1066 lines
46 KiB
Rust

// file: kb_lib/src/local_pipeline_replay.rs
//! Local pipeline replay from already persisted raw transaction data.
//!
//! This service does not fetch historical transactions from Solana RPC. It
//! reuses rows already present in `k_sol_chain_transactions` and replays the
//! deterministic local pipeline over their signatures.
const LOCAL_PIPELINE_DEX_DECODER_SCOPE: &str = "dex_decode.local_pipeline";
const LOCAL_PIPELINE_DEX_DECODER_VERSION: &str = "dex_decode.v0.7.51.raydium_amm_v4_max_decoder";
fn default_skip_certified_dex_decode() -> bool {
return true;
}
fn default_defer_instruction_observation_index_refresh() -> bool {
return true;
}
/// Configuration for a local pipeline replay pass.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LocalPipelineReplayConfig {
/// Maximum number of persisted transactions to replay.
pub limit: std::option::Option<i64>,
/// Whether token metadata backfill should run after local replay.
pub refresh_missing_token_metadata: bool,
/// Maximum number of missing token metadata rows to resolve.
pub token_metadata_limit: std::option::Option<i64>,
/// Whether locally replayed market materialization tables are reset before replay.
pub reset_market_materialization_before_replay: bool,
/// Whether DEX decoding may be skipped when the replay ledger certifies it is safe.
#[serde(default = "default_skip_certified_dex_decode")]
pub skip_certified_dex_decode: bool,
/// Whether DEX decoding must run even when the replay ledger certifies a safe prior pass.
#[serde(default)]
pub force_decode_replay: bool,
/// Whether instruction observation indexing is deferred and refreshed once after replay.
#[serde(default = "default_defer_instruction_observation_index_refresh")]
pub defer_instruction_observation_index_refresh: bool,
}
impl Default for LocalPipelineReplayConfig {
fn default() -> Self {
return Self {
limit: Some(10_000),
refresh_missing_token_metadata: false,
token_metadata_limit: Some(250),
reset_market_materialization_before_replay: true,
skip_certified_dex_decode: true,
force_decode_replay: false,
defer_instruction_observation_index_refresh: true,
};
}
}
/// Summary of a local pipeline replay pass.
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LocalPipelineReplayResult {
/// Number of transaction signatures selected for replay.
pub selected_transaction_count: usize,
/// Number of transactions replayed without a fatal per-signature error.
pub replayed_transaction_count: usize,
/// Number of transactions that produced a decode error.
pub decode_error_count: usize,
/// Number of transactions that produced a detect error.
pub detect_error_count: usize,
/// Number of transactions that produced a trade aggregation error.
pub trade_aggregation_error_count: usize,
/// Number of transactions that produced a non-trade materialization error.
pub non_trade_materialization_error_count: usize,
/// Number of transactions that produced a candle aggregation error.
pub pair_candle_error_count: usize,
/// Number of transactions that produced an analytic signal error.
pub analytic_signal_error_count: usize,
/// Total decoded events returned by replayed decode calls.
pub decoded_event_count: usize,
/// Number of transactions where DEX decoding was skipped through the replay ledger.
pub decode_skipped_count: usize,
/// Number of persisted decoded events covered by skipped decode ledger rows.
pub decode_skipped_event_count: usize,
/// Number of replay ledger rows upserted by this replay pass.
pub decode_ledger_upsert_count: usize,
/// Number of replay ledger rows marked unsafe for future decode skip.
pub decode_ledger_unsafe_count: usize,
/// Total detection results returned by replayed detect calls.
pub detection_count: usize,
/// Total trade aggregation results returned by replayed aggregation calls.
pub trade_event_count: usize,
/// Total liquidity event materialization results returned by replayed non-trade calls.
pub liquidity_event_count: usize,
/// Total pool lifecycle event materialization results returned by replayed non-trade calls.
pub pool_lifecycle_event_count: usize,
/// Total fee event materialization results returned by replayed non-trade calls.
pub fee_event_count: usize,
/// Total reward event materialization results returned by replayed non-trade calls.
pub reward_event_count: usize,
/// Total pool administration event materialization results returned by replayed non-trade calls.
pub pool_admin_event_count: usize,
/// Total orderbook event materialization results returned by replayed non-trade calls.
pub orderbook_event_count: usize,
/// Total token-account event materialization results returned by replayed non-trade calls.
pub token_account_event_count: usize,
/// Total candle upsert results returned by replayed candle calls.
///
/// This is a replay write/result counter, not the number of distinct rows
/// currently persisted in `k_sol_pair_candles`. Use local diagnostics for the
/// persisted row count.
pub pair_candle_upsert_count: usize,
/// Total analytic signal upsert results returned by replayed analytic calls.
///
/// This is a replay write/result counter, not the number of distinct rows
/// currently persisted in the analytic signal table.
pub analytic_signal_upsert_count: usize,
/// Total transaction classification rows upserted during replay.
pub transaction_classification_count: usize,
/// Number of transactions that produced a classification error.
pub transaction_classification_error_count: usize,
/// Number of token metadata rows updated after replay.
pub token_metadata_updated_count: usize,
/// Number of pair symbols updated after replay.
pub pair_symbol_updated_count: usize,
/// Number of derived market materialization rows deleted before replay.
pub reset_market_materialization_deleted_count: u64,
/// Total instruction source rows scanned by the observation index refresh.
pub instruction_observation_scanned_count: usize,
/// Total instruction-observation rows upserted by the observation index refresh.
pub instruction_observation_upserted_count: usize,
/// Number of errors outside per-signature replay.
pub global_error_count: usize,
}
/// Local pipeline replay service.
#[derive(Debug, Clone)]
pub struct LocalPipelineReplayService {
database: std::sync::Arc<crate::Database>,
http_pool: std::option::Option<std::sync::Arc<crate::HttpEndpointPool>>,
http_role: std::string::String,
}
impl LocalPipelineReplayService {
/// Creates a new local-only pipeline replay service.
pub fn new(database: std::sync::Arc<crate::Database>) -> Self {
return Self {
database,
http_pool: None,
http_role: "local_metadata".to_string(),
};
}
/// Creates a new pipeline replay service able to refresh token metadata over Solana HTTP RPC.
pub fn new_with_http_pool(
http_pool: std::sync::Arc<crate::HttpEndpointPool>,
database: std::sync::Arc<crate::Database>,
http_role: std::string::String,
) -> Self {
return Self {
database,
http_pool: Some(http_pool),
http_role,
};
}
/// Replays the local pipeline from persisted raw chain transaction rows.
pub async fn replay_local_pipeline(
&self,
config: &crate::LocalPipelineReplayConfig,
) -> Result<crate::LocalPipelineReplayResult, crate::Error> {
let signatures_result = crate::query_chain_transactions_list_signatures_for_replay(
self.database.as_ref(),
config.limit,
)
.await;
let signatures = match signatures_result {
Ok(signatures) => signatures,
Err(error) => return Err(error),
};
let mut reset_market_materialization_deleted_count = 0_u64;
if config.reset_market_materialization_before_replay {
let reset_result =
crate::query_trade_market_materialization_delete_all(self.database.as_ref()).await;
reset_market_materialization_deleted_count = match reset_result {
Ok(deleted_count) => deleted_count,
Err(error) => return Err(error),
};
tracing::debug!(
deleted_count = reset_market_materialization_deleted_count,
"local pipeline replay reset market materialization tables"
);
}
let dex_decode = crate::DexDecodeService::new(self.database.clone());
let dex_detect = crate::DexDetectService::new(self.database.clone());
let trade_aggregation = crate::TradeAggregationService::new(self.database.clone());
let non_trade_materialization =
crate::NonTradeEventMaterializationService::new(self.database.clone());
let pair_candle_aggregation =
crate::PairCandleAggregationService::new(self.database.clone());
let pair_analytic_signal = crate::PairAnalyticSignalService::new(self.database.clone());
let transaction_classification =
crate::TransactionClassificationService::new(self.database.clone());
let instruction_observation_index =
crate::InstructionObservationIndexService::new(self.database.clone());
let mut result = LocalPipelineReplayResult {
selected_transaction_count: signatures.len(),
reset_market_materialization_deleted_count,
..Default::default()
};
for signature in signatures {
tracing::debug!(
signature = %signature,
"replaying local pipeline for persisted transaction"
);
let transaction_result = crate::query_chain_transactions_get_by_signature(
self.database.as_ref(),
signature.as_str(),
)
.await;
let transaction = match transaction_result {
Ok(Some(transaction)) => transaction,
Ok(None) => {
result.global_error_count += 1;
tracing::warn!(
signature = %signature,
"local pipeline replay transaction row disappeared before replay"
);
continue;
},
Err(error) => {
result.global_error_count += 1;
tracing::warn!(
signature = %signature,
error = %error,
"local pipeline replay transaction lookup failed"
);
continue;
},
};
let transaction_id = match transaction.id {
Some(transaction_id) => transaction_id,
None => {
result.global_error_count += 1;
tracing::warn!(
signature = %signature,
"local pipeline replay transaction row has no persisted id"
);
continue;
},
};
let decode_skip_ledger_result = self
.get_certified_dex_decode_skip_ledger(config, transaction_id, signature.as_str())
.await;
let decode_skip_ledger = match decode_skip_ledger_result {
Ok(decode_skip_ledger) => decode_skip_ledger,
Err(error) => return Err(error),
};
match decode_skip_ledger {
Some(ledger) => {
result.decode_skipped_count += 1;
let ledger_event_count = usize::try_from(ledger.event_count);
match ledger_event_count {
Ok(event_count) => {
result.decode_skipped_event_count += event_count;
},
Err(error) => {
result.global_error_count += 1;
tracing::warn!(
signature = %signature,
event_count = ledger.event_count,
error = %error,
"local pipeline replay could not convert skipped event count"
);
},
}
tracing::debug!(
signature = %signature,
event_count = ledger.event_count,
decoder_version = %ledger.decoder_version,
"local pipeline replay skipped certified DEX decode step"
);
},
None => {
let replay_scope_delete_result =
crate::query_dex_decoded_events_delete_local_replay_scope_by_transaction_id(
self.database.as_ref(),
transaction_id,
)
.await;
match replay_scope_delete_result {
Ok(deleted_count) => {
result.reset_market_materialization_deleted_count = result
.reset_market_materialization_deleted_count
.saturating_add(deleted_count);
if deleted_count > 0 {
tracing::debug!(
signature = %signature,
transaction_id,
deleted_count,
"local pipeline replay deleted stale local DEX replay scope before decode"
);
}
},
Err(error) => return Err(error),
}
let decode_result =
dex_decode.decode_transaction_by_signature(signature.as_str()).await;
match decode_result {
Ok(decoded_events) => {
result.decoded_event_count += decoded_events.len();
let ledger_result = self
.record_dex_decode_replay_ledger(
transaction_id,
signature.as_str(),
&decoded_events,
)
.await;
match ledger_result {
Ok(ledger) => {
result.decode_ledger_upsert_count += 1;
if ledger.force_replay_required {
result.decode_ledger_unsafe_count += 1;
}
},
Err(error) => {
result.global_error_count += 1;
tracing::warn!(
signature = %signature,
error = %error,
"local pipeline replay could not record successful decode ledger row"
);
},
}
},
Err(error) => {
result.decode_error_count += 1;
let ledger_result = self
.record_failed_dex_decode_replay_ledger(
transaction_id,
signature.as_str(),
error.to_string(),
)
.await;
match ledger_result {
Ok(_) => {
result.decode_ledger_upsert_count += 1;
result.decode_ledger_unsafe_count += 1;
},
Err(ledger_error) => {
result.global_error_count += 1;
tracing::warn!(
signature = %signature,
error = %ledger_error,
"local pipeline replay could not record failed decode ledger row"
);
},
}
tracing::warn!(
signature = %signature,
error = %error,
"local pipeline replay decode step failed"
);
continue;
},
}
},
}
let detect_result =
dex_detect.detect_transaction_by_signature(signature.as_str()).await;
match detect_result {
Ok(detections) => {
result.detection_count += detections.len();
},
Err(error) => {
result.detect_error_count += 1;
tracing::warn!(
signature = %signature,
error = %error,
"local pipeline replay detect step failed; continuing with aggregation"
);
},
}
let non_trade_result = non_trade_materialization
.record_transaction_by_signature(signature.as_str())
.await;
match non_trade_result {
Ok(non_trade_result) => {
result.liquidity_event_count += non_trade_result.liquidity_event_count;
result.pool_lifecycle_event_count +=
non_trade_result.pool_lifecycle_event_count;
result.fee_event_count += non_trade_result.fee_event_count;
result.reward_event_count += non_trade_result.reward_event_count;
result.pool_admin_event_count += non_trade_result.pool_admin_event_count;
result.orderbook_event_count += non_trade_result.orderbook_event_count;
result.token_account_event_count += non_trade_result.token_account_event_count;
},
Err(error) => {
result.non_trade_materialization_error_count += 1;
tracing::warn!(
signature = %signature,
error = %error,
"local pipeline replay non-trade materialization step failed"
);
continue;
},
}
let trade_result =
trade_aggregation.record_transaction_by_signature(signature.as_str()).await;
match trade_result {
Ok(trade_results) => {
result.trade_event_count += trade_results.len();
},
Err(error) => {
result.trade_aggregation_error_count += 1;
tracing::warn!(
signature = %signature,
error = %error,
"local pipeline replay trade aggregation step failed"
);
continue;
},
}
let candle_result = pair_candle_aggregation
.record_transaction_by_signature(signature.as_str())
.await;
match candle_result {
Ok(candle_results) => {
result.pair_candle_upsert_count += candle_results.len();
},
Err(error) => {
result.pair_candle_error_count += 1;
tracing::warn!(
signature = %signature,
error = %error,
"local pipeline replay candle aggregation step failed"
);
},
}
let analytic_result =
pair_analytic_signal.record_transaction_by_signature(signature.as_str()).await;
match analytic_result {
Ok(analytic_results) => {
result.analytic_signal_upsert_count += analytic_results.len();
},
Err(error) => {
result.analytic_signal_error_count += 1;
tracing::warn!(
signature = %signature,
error = %error,
"local pipeline replay analytic signal step failed"
);
},
}
let classification_result = transaction_classification
.classify_transaction_by_signature(signature.as_str())
.await;
match classification_result {
Ok(_) => {
result.transaction_classification_count += 1;
},
Err(error) => {
result.transaction_classification_error_count += 1;
tracing::warn!(
signature = %signature,
error = %error,
"local pipeline replay transaction classification step failed"
);
},
}
if !config.defer_instruction_observation_index_refresh {
let instruction_index_result =
instruction_observation_index.refresh_signature(signature.as_str()).await;
match instruction_index_result {
Ok(index_result) => {
result.instruction_observation_scanned_count +=
index_result.scanned_instruction_count;
result.instruction_observation_upserted_count +=
index_result.upserted_observation_count;
tracing::debug!(
signature = %signature,
upserted_observation_count = index_result.upserted_observation_count,
"instruction observation index refreshed during local replay"
);
},
Err(error) => {
tracing::warn!(
signature = %signature,
error = %error,
"instruction observation index refresh failed during local replay"
);
},
}
}
result.replayed_transaction_count += 1;
}
if config.defer_instruction_observation_index_refresh {
let instruction_index_result =
instruction_observation_index.refresh_replay_window(config.limit).await;
match instruction_index_result {
Ok(index_result) => {
result.instruction_observation_scanned_count +=
index_result.scanned_instruction_count;
result.instruction_observation_upserted_count +=
index_result.upserted_observation_count;
tracing::debug!(
scanned_instruction_count = index_result.scanned_instruction_count,
upserted_observation_count = index_result.upserted_observation_count,
"instruction observation index refreshed after local replay"
);
},
Err(error) => {
result.global_error_count += 1;
tracing::warn!(
error = %error,
"instruction observation index refresh failed after local replay"
);
},
}
}
if config.refresh_missing_token_metadata {
let metadata_service = match &self.http_pool {
Some(http_pool) => crate::TokenMetadataBackfillService::new(
http_pool.clone(),
self.database.clone(),
self.http_role.clone(),
),
None => crate::TokenMetadataBackfillService::new_local(self.database.clone()),
};
let metadata_result = metadata_service
.backfill_missing_token_metadata(config.token_metadata_limit)
.await;
match metadata_result {
Ok(metadata_result) => {
result.token_metadata_updated_count += metadata_result.updated_token_count;
},
Err(error) => {
result.global_error_count += 1;
tracing::warn!(
error = %error,
"token metadata refresh failed after local pipeline replay"
);
},
}
}
self.refresh_event_coverage_best_effort().await;
return Ok(result);
}
async fn refresh_event_coverage_best_effort(&self) {
let cpmm_cleanup_result =
crate::query_dex_decoded_events_delete_replaced_raydium_cpmm_instruction_audits(
self.database.as_ref(),
None,
)
.await;
match cpmm_cleanup_result {
Ok(deleted_count) => {
if deleted_count > 0 {
tracing::info!(
deleted_count = deleted_count,
"replaced Raydium CPMM instruction audits cleaned before dex event coverage refresh"
);
}
},
Err(error) => {
tracing::warn!(
error = %error,
"Raydium CPMM replaced instruction-audit cleanup failed before dex event coverage refresh"
);
},
}
let cleanup_result =
crate::query_dex_decoded_events_delete_replaced_raydium_clmm_instruction_audits(
self.database.as_ref(),
None,
)
.await;
match cleanup_result {
Ok(deleted_count) => {
if deleted_count > 0 {
tracing::info!(
deleted_count = deleted_count,
"replaced Raydium CLMM instruction audits cleaned before dex event coverage refresh"
);
}
},
Err(error) => {
tracing::warn!(
error = %error,
"Raydium CLMM replaced instruction-audit cleanup failed before dex event coverage refresh"
);
},
}
let upstream_cleanup_result =
crate::query_dex_decoded_events_delete_locally_covered_upstream_instruction_matches(
self.database.as_ref(),
None,
)
.await;
match upstream_cleanup_result {
Ok(deleted_count) => {
if deleted_count > 0 {
tracing::info!(
deleted_count = deleted_count,
"locally covered upstream instruction matches cleaned before dex event coverage refresh"
);
}
},
Err(error) => {
tracing::warn!(
error = %error,
"locally covered upstream instruction-match cleanup failed before dex event coverage refresh"
);
},
}
let coverage_service = crate::DexEventCoverageService::new(self.database.clone());
let refresh_result = coverage_service.refresh_local_counts(None).await;
match refresh_result {
Ok(refresh_result) => {
tracing::debug!(
upserted_entry_count = refresh_result.upserted_entry_count,
refreshed_entry_count = refresh_result.refreshed_entry_count,
summary_count = refresh_result.summaries.len(),
"dex event coverage refreshed after local pipeline replay"
);
},
Err(error) => {
tracing::warn!(
error = %error,
"dex event coverage refresh failed after local pipeline replay"
);
},
}
let post_refresh_cpmm_cleanup_result =
crate::query_dex_decoded_events_delete_replaced_raydium_cpmm_instruction_audits(
self.database.as_ref(),
None,
)
.await;
match post_refresh_cpmm_cleanup_result {
Ok(deleted_count) => {
if deleted_count > 0 {
tracing::info!(
deleted_count = deleted_count,
"replaced Raydium CPMM instruction audits cleaned after dex event coverage refresh"
);
let second_refresh_result = coverage_service.refresh_local_counts(None).await;
match second_refresh_result {
Ok(second_refresh_result) => {
tracing::debug!(
upserted_entry_count = second_refresh_result.upserted_entry_count,
refreshed_entry_count = second_refresh_result.refreshed_entry_count,
summary_count = second_refresh_result.summaries.len(),
"dex event coverage refreshed after Raydium CPMM instruction-audit cleanup"
);
},
Err(error) => {
tracing::warn!(
error = %error,
"dex event coverage refresh failed after Raydium CPMM instruction-audit cleanup"
);
},
}
}
},
Err(error) => {
tracing::warn!(
error = %error,
"Raydium CPMM replaced instruction-audit cleanup failed after dex event coverage refresh"
);
},
}
let post_refresh_upstream_cleanup_result =
crate::query_dex_decoded_events_delete_locally_covered_upstream_instruction_matches(
self.database.as_ref(),
None,
)
.await;
match post_refresh_upstream_cleanup_result {
Ok(deleted_count) => {
if deleted_count > 0 {
tracing::info!(
deleted_count = deleted_count,
"locally covered upstream instruction matches cleaned after dex event coverage refresh"
);
let second_refresh_result = coverage_service.refresh_local_counts(None).await;
match second_refresh_result {
Ok(second_refresh_result) => {
tracing::debug!(
upserted_entry_count = second_refresh_result.upserted_entry_count,
refreshed_entry_count = second_refresh_result.refreshed_entry_count,
summary_count = second_refresh_result.summaries.len(),
"dex event coverage refreshed after upstream instruction-match cleanup"
);
},
Err(error) => {
tracing::warn!(
error = %error,
"dex event coverage refresh failed after upstream instruction-match cleanup"
);
},
}
}
},
Err(error) => {
tracing::warn!(
error = %error,
"locally covered upstream instruction-match cleanup failed after dex event coverage refresh"
);
},
}
}
async fn get_certified_dex_decode_skip_ledger(
&self,
config: &crate::LocalPipelineReplayConfig,
transaction_id: i64,
signature: &str,
) -> Result<std::option::Option<crate::DexDecodeReplayLedgerDto>, crate::Error> {
if config.force_decode_replay {
return Ok(None);
}
if !config.skip_certified_dex_decode {
return Ok(None);
}
let ledger_result = crate::query_dex_decode_replay_ledger_get_by_transaction(
self.database.as_ref(),
transaction_id,
LOCAL_PIPELINE_DEX_DECODER_SCOPE,
LOCAL_PIPELINE_DEX_DECODER_VERSION,
)
.await;
let ledger_option = match ledger_result {
Ok(ledger_option) => ledger_option,
Err(error) => return Err(error),
};
match ledger_option {
Some(ledger) => {
if ledger.can_skip_decode() {
let persisted_count_result = self
.count_persisted_decoded_events_for_skip(transaction_id, signature)
.await;
let persisted_count = match persisted_count_result {
Ok(persisted_count) => persisted_count,
Err(error) => return Err(error),
};
if persisted_count >= ledger.event_count {
return Ok(Some(ledger));
}
tracing::debug!(
signature = %signature,
ledger_event_count = ledger.event_count,
persisted_event_count = persisted_count,
"local pipeline replay ledger is certified but persisted decoded events are missing"
);
return Ok(None);
}
tracing::debug!(
signature = %signature,
decode_status = %ledger.decode_status,
certainty = %ledger.certainty,
force_replay_required = ledger.force_replay_required,
"local pipeline replay ledger requires DEX decode"
);
return Ok(None);
},
None => return Ok(None),
}
}
async fn count_persisted_decoded_events_for_skip(
&self,
transaction_id: i64,
signature: &str,
) -> Result<i64, crate::Error> {
let events_result = crate::query_dex_decoded_events_list_by_transaction_id(
self.database.as_ref(),
transaction_id,
)
.await;
let events = match events_result {
Ok(events) => events,
Err(error) => return Err(error),
};
let count_result = i64::try_from(events.len());
match count_result {
Ok(count) => return Ok(count),
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot convert persisted decoded event count for signature '{}' to i64: {}",
signature, error
)));
},
}
}
async fn record_dex_decode_replay_ledger(
&self,
transaction_id: i64,
signature: &str,
decoded_events: &[crate::DexDecodedEventDto],
) -> Result<crate::DexDecodeReplayLedgerDto, crate::Error> {
let ledger_result =
build_success_dex_decode_replay_ledger(transaction_id, signature, decoded_events);
let ledger = match ledger_result {
Ok(ledger) => ledger,
Err(error) => return Err(error),
};
let upsert_result =
crate::query_dex_decode_replay_ledger_upsert(self.database.as_ref(), &ledger).await;
match upsert_result {
Ok(_) => return Ok(ledger),
Err(error) => return Err(error),
}
}
async fn record_failed_dex_decode_replay_ledger(
&self,
transaction_id: i64,
signature: &str,
error_message: std::string::String,
) -> Result<crate::DexDecodeReplayLedgerDto, crate::Error> {
let ledger = crate::DexDecodeReplayLedgerDto::new(
transaction_id,
signature.to_string(),
LOCAL_PIPELINE_DEX_DECODER_SCOPE.to_string(),
LOCAL_PIPELINE_DEX_DECODER_VERSION.to_string(),
crate::DexDecodeReplayLedgerDto::STATUS_FAILED.to_string(),
crate::DexDecodeReplayLedgerDto::CERTAINTY_UNSAFE.to_string(),
0,
0,
true,
Some(format!("decode failed: {error_message}")),
);
let upsert_result =
crate::query_dex_decode_replay_ledger_upsert(self.database.as_ref(), &ledger).await;
match upsert_result {
Ok(_) => return Ok(ledger),
Err(error) => return Err(error),
}
}
}
fn build_success_dex_decode_replay_ledger(
transaction_id: i64,
signature: &str,
decoded_events: &[crate::DexDecodedEventDto],
) -> Result<crate::DexDecodeReplayLedgerDto, crate::Error> {
let event_count_result = i64::try_from(decoded_events.len());
let event_count = match event_count_result {
Ok(event_count) => event_count,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot convert decoded event count '{}' to i64: {}",
decoded_events.len(),
error
)));
},
};
let effective_event_count_usize = count_effective_decoded_events(decoded_events);
let effective_event_count_result = i64::try_from(effective_event_count_usize);
let effective_event_count = match effective_event_count_result {
Ok(effective_event_count) => effective_event_count,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot convert effective decoded event count '{}' to i64: {}",
effective_event_count_usize, error
)));
},
};
let instruction_audit_count = event_count - effective_event_count;
let distinct_token_mint_count_usize = count_distinct_decoded_event_token_mints(decoded_events);
let distinct_token_mint_count_result = i64::try_from(distinct_token_mint_count_usize);
let distinct_token_mint_count = match distinct_token_mint_count_result {
Ok(distinct_token_mint_count) => distinct_token_mint_count,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot convert distinct token mint count '{}' to i64: {}",
distinct_token_mint_count_usize, error
)));
},
};
let force_replay_required = effective_event_count > 1 || distinct_token_mint_count > 2;
let decode_status = if event_count == 0 {
crate::DexDecodeReplayLedgerDto::STATUS_NO_EVENTS.to_string()
} else {
crate::DexDecodeReplayLedgerDto::STATUS_DECODED.to_string()
};
let certainty = if force_replay_required {
crate::DexDecodeReplayLedgerDto::CERTAINTY_UNSAFE.to_string()
} else {
crate::DexDecodeReplayLedgerDto::CERTAINTY_SURE.to_string()
};
let status_reason = build_dex_decode_replay_ledger_status_reason(
event_count,
effective_event_count,
instruction_audit_count,
distinct_token_mint_count,
force_replay_required,
);
return Ok(crate::DexDecodeReplayLedgerDto::new(
transaction_id,
signature.to_string(),
LOCAL_PIPELINE_DEX_DECODER_SCOPE.to_string(),
LOCAL_PIPELINE_DEX_DECODER_VERSION.to_string(),
decode_status,
certainty,
event_count,
distinct_token_mint_count,
force_replay_required,
Some(status_reason),
));
}
fn count_effective_decoded_events(decoded_events: &[crate::DexDecodedEventDto]) -> usize {
let mut count = 0_usize;
for event in decoded_events {
if is_replay_audit_only_event(event) {
continue;
}
count += 1;
}
return count;
}
fn is_replay_audit_only_event(event: &crate::DexDecodedEventDto) -> bool {
if event.event_kind.ends_with(".instruction_audit") {
return true;
}
if event.event_kind.ends_with("_audit") {
return true;
}
if event.event_kind == crate::UPSTREAM_REGISTRY_INSTRUCTION_MATCH_EVENT_KIND {
return true;
}
return false;
}
fn count_distinct_decoded_event_token_mints(decoded_events: &[crate::DexDecodedEventDto]) -> usize {
let mut mints = std::collections::BTreeSet::<std::string::String>::new();
for event in decoded_events {
insert_optional_mint(&mut mints, &event.lp_mint);
insert_optional_mint(&mut mints, &event.token_a_mint);
insert_optional_mint(&mut mints, &event.token_b_mint);
}
return mints.len();
}
fn insert_optional_mint(
mints: &mut std::collections::BTreeSet<std::string::String>,
mint_option: &std::option::Option<std::string::String>,
) {
if let Some(mint) = mint_option {
let trimmed = mint.trim();
if !trimmed.is_empty() {
mints.insert(trimmed.to_string());
}
}
}
fn build_dex_decode_replay_ledger_status_reason(
event_count: i64,
effective_event_count: i64,
instruction_audit_count: i64,
distinct_token_mint_count: i64,
force_replay_required: bool,
) -> std::string::String {
if event_count == 0 {
return "decode completed with no persisted DEX event".to_string();
}
if force_replay_required {
return format!(
"decode completed but remains unsafe for skip: event_count={event_count}, effective_event_count={effective_event_count}, instruction_audit_count={instruction_audit_count}, distinct_token_mint_count={distinct_token_mint_count}"
);
}
return format!(
"decode completed and certified for skip: event_count={event_count}, effective_event_count={effective_event_count}, instruction_audit_count={instruction_audit_count}, distinct_token_mint_count={distinct_token_mint_count}"
);
}
/// Replays the local pipeline from persisted raw chain transaction rows.
pub async fn replay_local_pipeline(
database: std::sync::Arc<crate::Database>,
config: &crate::LocalPipelineReplayConfig,
) -> Result<crate::LocalPipelineReplayResult, crate::Error> {
let service = crate::LocalPipelineReplayService::new(database);
return service.replay_local_pipeline(config).await;
}
#[cfg(test)]
mod tests {
fn make_decoded_event(
event_kind: &str,
token_a_mint: std::option::Option<&str>,
token_b_mint: std::option::Option<&str>,
) -> crate::DexDecodedEventDto {
return crate::DexDecodedEventDto::new(
1,
Some(10),
"meteora_dlmm".to_string(),
crate::METEORA_DLMM_PROGRAM_ID.to_string(),
event_kind.to_string(),
Some("pool".to_string()),
None,
token_a_mint.map(|value| return value.to_string()),
token_b_mint.map(|value| return value.to_string()),
None,
"{}".to_string(),
);
}
#[test]
fn ledger_certifies_one_effective_event_with_instruction_audits() {
let events = vec![
make_decoded_event("meteora_dlmm.swap", Some("mint-a"), Some("mint-b")),
make_decoded_event("meteora_dlmm.instruction_audit", None, None),
make_decoded_event("meteora_dlmm.instruction_audit", None, None),
];
let ledger = super::build_success_dex_decode_replay_ledger(1, "sig", events.as_slice())
.expect("ledger must build");
assert_eq!(ledger.event_count, 3);
assert_eq!(ledger.distinct_token_mint_count, 2);
assert!(!ledger.force_replay_required);
assert_eq!(ledger.certainty, crate::DexDecodeReplayLedgerDto::CERTAINTY_SURE);
assert!(ledger.can_skip_decode());
}
#[test]
fn ledger_treats_audit_suffix_events_as_audit_only() {
let events = vec![
make_decoded_event("openbook_v2.settle_funds_audit", Some("mint-a"), Some("mint-b")),
make_decoded_event("openbook_v2.order_place_audit", None, None),
];
let ledger = super::build_success_dex_decode_replay_ledger(1, "sig", events.as_slice())
.expect("ledger must build");
assert_eq!(ledger.event_count, 2);
assert_eq!(
ledger.status_reason.as_deref(),
Some(
"decode completed and certified for skip: event_count=2, effective_event_count=0, instruction_audit_count=2, distinct_token_mint_count=2"
)
);
assert!(!ledger.force_replay_required);
assert!(ledger.can_skip_decode());
}
#[test]
fn ledger_keeps_multiple_effective_events_unsafe() {
let events = vec![
make_decoded_event("meteora_dlmm.swap", Some("mint-a"), Some("mint-b")),
make_decoded_event("meteora_dlmm.swap", Some("mint-a"), Some("mint-b")),
make_decoded_event("meteora_dlmm.instruction_audit", None, None),
];
let ledger = super::build_success_dex_decode_replay_ledger(1, "sig", events.as_slice())
.expect("ledger must build");
assert_eq!(ledger.event_count, 3);
assert!(ledger.force_replay_required);
assert_eq!(ledger.certainty, crate::DexDecodeReplayLedgerDto::CERTAINTY_UNSAFE);
assert!(!ledger.can_skip_decode());
}
}