This commit is contained in:
2026-05-27 18:45:16 +02:00
parent d9558a5c16
commit 96b6209482
18 changed files with 996 additions and 23 deletions

View File

@@ -6,6 +6,13 @@
//! reuses rows already present in `k_sol_chain_transactions` and replays the
//! deterministic local pipeline over their signatures.
const LOCAL_PIPELINE_DEX_DECODER_SCOPE: &str = "dex_decode.local_pipeline";
const LOCAL_PIPELINE_DEX_DECODER_VERSION: &str = "dex_decode.v0.7.44.ledger1";
fn default_skip_certified_dex_decode() -> bool {
return true;
}
/// Configuration for a local pipeline replay pass.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -18,6 +25,12 @@ pub struct LocalPipelineReplayConfig {
pub token_metadata_limit: std::option::Option<i64>,
/// Whether locally replayed market materialization tables are reset before replay.
pub reset_market_materialization_before_replay: bool,
/// Whether DEX decoding may be skipped when the replay ledger certifies it is safe.
#[serde(default = "default_skip_certified_dex_decode")]
pub skip_certified_dex_decode: bool,
/// Whether DEX decoding must run even when the replay ledger certifies a safe prior pass.
#[serde(default)]
pub force_decode_replay: bool,
}
impl Default for LocalPipelineReplayConfig {
@@ -27,6 +40,8 @@ impl Default for LocalPipelineReplayConfig {
refresh_missing_token_metadata: false,
token_metadata_limit: Some(250),
reset_market_materialization_before_replay: true,
skip_certified_dex_decode: true,
force_decode_replay: false,
};
}
}
@@ -53,6 +68,14 @@ pub struct LocalPipelineReplayResult {
pub analytic_signal_error_count: usize,
/// Total decoded events returned by replayed decode calls.
pub decoded_event_count: usize,
/// Number of transactions where DEX decoding was skipped through the replay ledger.
pub decode_skipped_count: usize,
/// Number of persisted decoded events covered by skipped decode ledger rows.
pub decode_skipped_event_count: usize,
/// Number of replay ledger rows upserted by this replay pass.
pub decode_ledger_upsert_count: usize,
/// Number of replay ledger rows marked unsafe for future decode skip.
pub decode_ledger_unsafe_count: usize,
/// Total detection results returned by replayed detect calls.
pub detection_count: usize,
/// Total trade aggregation results returned by replayed aggregation calls.
@@ -170,21 +193,135 @@ impl LocalPipelineReplayService {
signature = %signature,
"replaying local pipeline for persisted transaction"
);
let decode_result =
dex_decode.decode_transaction_by_signature(signature.as_str()).await;
match decode_result {
Ok(decoded_events) => {
result.decoded_event_count += decoded_events.len();
let transaction_result =
crate::query_chain_transactions_get_by_signature(self.database.as_ref(), signature.as_str())
.await;
let transaction = match transaction_result {
Ok(Some(transaction)) => transaction,
Ok(None) => {
result.global_error_count += 1;
tracing::warn!(
signature = %signature,
"local pipeline replay transaction row disappeared before replay"
);
continue;
},
Err(error) => {
result.decode_error_count += 1;
result.global_error_count += 1;
tracing::warn!(
signature = %signature,
error = %error,
"local pipeline replay decode step failed"
"local pipeline replay transaction lookup failed"
);
continue;
},
};
let transaction_id = match transaction.id {
Some(transaction_id) => transaction_id,
None => {
result.global_error_count += 1;
tracing::warn!(
signature = %signature,
"local pipeline replay transaction row has no persisted id"
);
continue;
},
};
let decode_skip_ledger_result = self
.get_certified_dex_decode_skip_ledger(config, transaction_id, signature.as_str())
.await;
let decode_skip_ledger = match decode_skip_ledger_result {
Ok(decode_skip_ledger) => decode_skip_ledger,
Err(error) => return Err(error),
};
match decode_skip_ledger {
Some(ledger) => {
result.decode_skipped_count += 1;
let ledger_event_count = usize::try_from(ledger.event_count);
match ledger_event_count {
Ok(event_count) => {
result.decode_skipped_event_count += event_count;
},
Err(error) => {
result.global_error_count += 1;
tracing::warn!(
signature = %signature,
event_count = ledger.event_count,
error = %error,
"local pipeline replay could not convert skipped event count"
);
},
}
tracing::debug!(
signature = %signature,
event_count = ledger.event_count,
decoder_version = %ledger.decoder_version,
"local pipeline replay skipped certified DEX decode step"
);
},
None => {
let decode_result = dex_decode
.decode_transaction_by_signature(signature.as_str())
.await;
match decode_result {
Ok(decoded_events) => {
result.decoded_event_count += decoded_events.len();
let ledger_result = self
.record_dex_decode_replay_ledger(
transaction_id,
signature.as_str(),
&decoded_events,
)
.await;
match ledger_result {
Ok(ledger) => {
result.decode_ledger_upsert_count += 1;
if ledger.force_replay_required {
result.decode_ledger_unsafe_count += 1;
}
},
Err(error) => {
result.global_error_count += 1;
tracing::warn!(
signature = %signature,
error = %error,
"local pipeline replay could not record successful decode ledger row"
);
},
}
},
Err(error) => {
result.decode_error_count += 1;
let ledger_result = self
.record_failed_dex_decode_replay_ledger(
transaction_id,
signature.as_str(),
error.to_string(),
)
.await;
match ledger_result {
Ok(_) => {
result.decode_ledger_upsert_count += 1;
result.decode_ledger_unsafe_count += 1;
},
Err(ledger_error) => {
result.global_error_count += 1;
tracing::warn!(
signature = %signature,
error = %ledger_error,
"local pipeline replay could not record failed decode ledger row"
);
},
}
tracing::warn!(
signature = %signature,
error = %error,
"local pipeline replay decode step failed"
);
continue;
},
}
},
}
let detect_result =
dex_detect.detect_transaction_by_signature(signature.as_str()).await;
@@ -315,6 +452,236 @@ impl LocalPipelineReplayService {
}
return Ok(result);
}
async fn get_certified_dex_decode_skip_ledger(
&self,
config: &crate::LocalPipelineReplayConfig,
transaction_id: i64,
signature: &str,
) -> Result<std::option::Option<crate::DexDecodeReplayLedgerDto>, crate::Error> {
if config.force_decode_replay {
return Ok(None);
}
if !config.skip_certified_dex_decode {
return Ok(None);
}
let ledger_result = crate::query_dex_decode_replay_ledger_get_by_transaction(
self.database.as_ref(),
transaction_id,
LOCAL_PIPELINE_DEX_DECODER_SCOPE,
LOCAL_PIPELINE_DEX_DECODER_VERSION,
)
.await;
let ledger_option = match ledger_result {
Ok(ledger_option) => ledger_option,
Err(error) => return Err(error),
};
match ledger_option {
Some(ledger) => {
if ledger.can_skip_decode() {
let persisted_count_result = self
.count_persisted_decoded_events_for_skip(transaction_id, signature)
.await;
let persisted_count = match persisted_count_result {
Ok(persisted_count) => persisted_count,
Err(error) => return Err(error),
};
if persisted_count >= ledger.event_count {
return Ok(Some(ledger));
}
tracing::debug!(
signature = %signature,
ledger_event_count = ledger.event_count,
persisted_event_count = persisted_count,
"local pipeline replay ledger is certified but persisted decoded events are missing"
);
return Ok(None);
}
tracing::debug!(
signature = %signature,
decode_status = %ledger.decode_status,
certainty = %ledger.certainty,
force_replay_required = ledger.force_replay_required,
"local pipeline replay ledger requires DEX decode"
);
return Ok(None);
},
None => return Ok(None),
}
}
async fn count_persisted_decoded_events_for_skip(
&self,
transaction_id: i64,
signature: &str,
) -> Result<i64, crate::Error> {
let events_result = crate::query_dex_decoded_events_list_by_transaction_id(
self.database.as_ref(),
transaction_id,
)
.await;
let events = match events_result {
Ok(events) => events,
Err(error) => return Err(error),
};
let count_result = i64::try_from(events.len());
match count_result {
Ok(count) => return Ok(count),
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot convert persisted decoded event count for signature '{}' to i64: {}",
signature, error
)));
},
}
}
async fn record_dex_decode_replay_ledger(
&self,
transaction_id: i64,
signature: &str,
decoded_events: &[crate::DexDecodedEventDto],
) -> Result<crate::DexDecodeReplayLedgerDto, crate::Error> {
let ledger_result = build_success_dex_decode_replay_ledger(
transaction_id,
signature,
decoded_events,
);
let ledger = match ledger_result {
Ok(ledger) => ledger,
Err(error) => return Err(error),
};
let upsert_result =
crate::query_dex_decode_replay_ledger_upsert(self.database.as_ref(), &ledger).await;
match upsert_result {
Ok(_) => return Ok(ledger),
Err(error) => return Err(error),
}
}
async fn record_failed_dex_decode_replay_ledger(
&self,
transaction_id: i64,
signature: &str,
error_message: std::string::String,
) -> Result<crate::DexDecodeReplayLedgerDto, crate::Error> {
let ledger = crate::DexDecodeReplayLedgerDto::new(
transaction_id,
signature.to_string(),
LOCAL_PIPELINE_DEX_DECODER_SCOPE.to_string(),
LOCAL_PIPELINE_DEX_DECODER_VERSION.to_string(),
crate::DexDecodeReplayLedgerDto::STATUS_FAILED.to_string(),
crate::DexDecodeReplayLedgerDto::CERTAINTY_UNSAFE.to_string(),
0,
0,
true,
Some(format!("decode failed: {error_message}")),
);
let upsert_result =
crate::query_dex_decode_replay_ledger_upsert(self.database.as_ref(), &ledger).await;
match upsert_result {
Ok(_) => return Ok(ledger),
Err(error) => return Err(error),
}
}
}
fn build_success_dex_decode_replay_ledger(
transaction_id: i64,
signature: &str,
decoded_events: &[crate::DexDecodedEventDto],
) -> Result<crate::DexDecodeReplayLedgerDto, crate::Error> {
let event_count_result = i64::try_from(decoded_events.len());
let event_count = match event_count_result {
Ok(event_count) => event_count,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot convert decoded event count '{}' to i64: {}",
decoded_events.len(), error
)));
},
};
let distinct_token_mint_count_usize = count_distinct_decoded_event_token_mints(decoded_events);
let distinct_token_mint_count_result = i64::try_from(distinct_token_mint_count_usize);
let distinct_token_mint_count = match distinct_token_mint_count_result {
Ok(distinct_token_mint_count) => distinct_token_mint_count,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot convert distinct token mint count '{}' to i64: {}",
distinct_token_mint_count_usize, error
)));
},
};
let force_replay_required = event_count > 1 || distinct_token_mint_count > 2;
let decode_status = if event_count == 0 {
crate::DexDecodeReplayLedgerDto::STATUS_NO_EVENTS.to_string()
} else {
crate::DexDecodeReplayLedgerDto::STATUS_DECODED.to_string()
};
let certainty = if force_replay_required {
crate::DexDecodeReplayLedgerDto::CERTAINTY_UNSAFE.to_string()
} else {
crate::DexDecodeReplayLedgerDto::CERTAINTY_SURE.to_string()
};
let status_reason = build_dex_decode_replay_ledger_status_reason(
event_count,
distinct_token_mint_count,
force_replay_required,
);
return Ok(crate::DexDecodeReplayLedgerDto::new(
transaction_id,
signature.to_string(),
LOCAL_PIPELINE_DEX_DECODER_SCOPE.to_string(),
LOCAL_PIPELINE_DEX_DECODER_VERSION.to_string(),
decode_status,
certainty,
event_count,
distinct_token_mint_count,
force_replay_required,
Some(status_reason),
));
}
fn count_distinct_decoded_event_token_mints(
decoded_events: &[crate::DexDecodedEventDto],
) -> usize {
let mut mints = std::collections::BTreeSet::<std::string::String>::new();
for event in decoded_events {
insert_optional_mint(&mut mints, &event.lp_mint);
insert_optional_mint(&mut mints, &event.token_a_mint);
insert_optional_mint(&mut mints, &event.token_b_mint);
}
return mints.len();
}
fn insert_optional_mint(
mints: &mut std::collections::BTreeSet<std::string::String>,
mint_option: &std::option::Option<std::string::String>,
) {
if let Some(mint) = mint_option {
let trimmed = mint.trim();
if !trimmed.is_empty() {
mints.insert(trimmed.to_string());
}
}
}
fn build_dex_decode_replay_ledger_status_reason(
event_count: i64,
distinct_token_mint_count: i64,
force_replay_required: bool,
) -> std::string::String {
if event_count == 0 {
return "decode completed with no persisted DEX event".to_string();
}
if force_replay_required {
return format!(
"decode completed but remains unsafe for skip: event_count={event_count}, distinct_token_mint_count={distinct_token_mint_count}"
);
}
return format!(
"decode completed and certified for skip: event_count={event_count}, distinct_token_mint_count={distinct_token_mint_count}"
);
}
/// Replays the local pipeline from persisted raw chain transaction rows.