This commit is contained in:
2026-05-13 09:39:50 +02:00
parent aa19ca9c18
commit 69385094ff
16 changed files with 293 additions and 36 deletions

View File

@@ -207,6 +207,7 @@ pub use queries::query_trade_events_get_by_decoded_event_id;
pub use queries::query_trade_events_list_by_pair_id;
pub use queries::query_trade_events_list_by_transaction_id;
pub use queries::query_trade_events_upsert;
pub use queries::query_trade_market_materialization_delete_all;
pub use queries::query_transaction_classifications_get_by_signature;
pub use queries::query_transaction_classifications_get_by_transaction_id;
pub use queries::query_transaction_classifications_list_recent;

View File

@@ -145,6 +145,7 @@ pub use trade_event::query_trade_events_get_by_decoded_event_id;
pub use trade_event::query_trade_events_list_by_pair_id;
pub use trade_event::query_trade_events_list_by_transaction_id;
pub use trade_event::query_trade_events_upsert;
pub use trade_event::query_trade_market_materialization_delete_all;
pub use transaction_classification::query_transaction_classifications_get_by_signature;
pub use transaction_classification::query_transaction_classifications_get_by_transaction_id;
pub use transaction_classification::query_transaction_classifications_list_recent;

View File

@@ -2,6 +2,41 @@
//! Queries for `k_sol_trade_events`.
/// Deletes all local market materialization rows that are rebuilt from trade events.
///
/// The local replay pipeline is deterministic over persisted raw transactions. Clearing
/// these derived tables before replay prevents stale failed-transaction trades, metrics
/// or candles from surviving after actionability rules change.
pub async fn query_trade_market_materialization_delete_all(
database: &crate::Database,
) -> Result<u64, crate::Error> {
match database.connection() {
crate::DatabaseConnection::Sqlite(pool) => {
let statements = [
("k_sol_pair_analytic_signals", "DELETE FROM k_sol_pair_analytic_signals"),
("k_sol_pair_candles", "DELETE FROM k_sol_pair_candles"),
("k_sol_pair_metrics", "DELETE FROM k_sol_pair_metrics"),
("k_sol_trade_events", "DELETE FROM k_sol_trade_events"),
];
let mut deleted_count = 0_u64;
for (table_name, statement) in statements {
let query_result = sqlx::query(statement).execute(pool).await;
let result = match query_result {
Ok(result) => result,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot clear {} during local market materialization reset on sqlite: {}",
table_name, error
)));
},
};
deleted_count += result.rows_affected();
}
return Ok(deleted_count);
},
}
}
/// Inserts or updates one trade-event row and returns its stable internal id.
pub async fn query_trade_events_upsert(
database: &crate::Database,

View File

@@ -687,6 +687,8 @@ pub use db::query_trade_events_list_by_pair_id;
pub use db::query_trade_events_list_by_transaction_id;
/// Inserts or updates one trade-event row and returns its stable internal id.
pub use db::query_trade_events_upsert;
/// Deletes all local market materialization rows rebuilt from trade events.
pub use db::query_trade_market_materialization_delete_all;
/// Reads one transaction classification by signature.
pub use db::query_transaction_classifications_get_by_signature;
/// Reads one transaction classification by transaction id.

View File

@@ -16,6 +16,8 @@ pub struct LocalPipelineReplayConfig {
pub refresh_missing_token_metadata: bool,
/// Maximum number of missing token metadata rows to resolve.
pub token_metadata_limit: std::option::Option<i64>,
/// Whether locally replayed market materialization tables are reset before replay.
pub reset_market_materialization_before_replay: bool,
}
impl Default for LocalPipelineReplayConfig {
@@ -24,6 +26,7 @@ impl Default for LocalPipelineReplayConfig {
limit: Some(10_000),
refresh_missing_token_metadata: false,
token_metadata_limit: Some(250),
reset_market_materialization_before_replay: true,
};
}
}
@@ -71,6 +74,8 @@ pub struct LocalPipelineReplayResult {
pub token_metadata_updated_count: usize,
/// Number of pair symbols updated after replay.
pub pair_symbol_updated_count: usize,
/// Number of derived market materialization rows deleted before replay.
pub reset_market_materialization_deleted_count: u64,
/// Number of errors outside per-signature replay.
pub global_error_count: usize,
}
@@ -120,6 +125,19 @@ impl LocalPipelineReplayService {
Ok(signatures) => signatures,
Err(error) => return Err(error),
};
let mut reset_market_materialization_deleted_count = 0_u64;
if config.reset_market_materialization_before_replay {
let reset_result =
crate::query_trade_market_materialization_delete_all(self.database.as_ref()).await;
reset_market_materialization_deleted_count = match reset_result {
Ok(deleted_count) => deleted_count,
Err(error) => return Err(error),
};
tracing::debug!(
deleted_count = reset_market_materialization_deleted_count,
"local pipeline replay reset market materialization tables"
);
}
let dex_decode = crate::DexDecodeService::new(self.database.clone());
let dex_detect = crate::DexDetectService::new(self.database.clone());
let trade_aggregation = crate::TradeAggregationService::new(self.database.clone());
@@ -130,6 +148,7 @@ impl LocalPipelineReplayService {
crate::TransactionClassificationService::new(self.database.clone());
let mut result = LocalPipelineReplayResult {
selected_transaction_count: signatures.len(),
reset_market_materialization_deleted_count,
..Default::default()
};
for signature in signatures {

View File

@@ -38,6 +38,8 @@ pub struct LocalPipelineValidationConfig {
pub require_trade_events_per_dex: bool,
/// Whether each DEX summary must contain at least one candle.
pub require_candles_per_dex: bool,
/// Whether non-actionable classification groups must have no linked trade events.
pub require_no_non_actionable_trade_events_materialized: bool,
}
impl Default for LocalPipelineValidationConfig {
@@ -56,6 +58,7 @@ impl Default for LocalPipelineValidationConfig {
require_decoded_events_per_dex: true,
require_trade_events_per_dex: true,
require_candles_per_dex: true,
require_no_non_actionable_trade_events_materialized: true,
};
}
}
@@ -82,6 +85,7 @@ impl LocalPipelineValidationConfig {
require_decoded_events_per_dex: true,
require_trade_events_per_dex: true,
require_candles_per_dex: true,
require_no_non_actionable_trade_events_materialized: true,
};
}
@@ -113,10 +117,10 @@ impl LocalPipelineValidationConfig {
require_decoded_events_per_dex: true,
require_trade_events_per_dex: false,
require_candles_per_dex: false,
require_no_non_actionable_trade_events_materialized: true,
};
}
/// Builds the `0.7.29` DEX support matrix baseline validation config.
///
/// This profile preserves the `0.7.28` trade/candle non-regression checks
@@ -144,6 +148,7 @@ impl LocalPipelineValidationConfig {
require_decoded_events_per_dex: true,
require_trade_events_per_dex: false,
require_candles_per_dex: false,
require_no_non_actionable_trade_events_materialized: true,
};
}
@@ -157,6 +162,18 @@ impl LocalPipelineValidationConfig {
config.profile_code = "0.7.30_non_trade_event_classification".to_string();
return config;
}
/// Builds the `0.7.31` trade-event actionability validation config.
///
/// This profile keeps the `0.7.30` classification checks and additionally
/// requires failed/non-actionable classification groups to have no linked
/// persisted trade events.
pub fn v0_7_31_trade_event_actionability_policy() -> Self {
let mut config = Self::v0_7_30_non_trade_event_classification();
config.profile_code = "0.7.31_trade_event_actionability_policy".to_string();
config.require_no_non_actionable_trade_events_materialized = true;
return config;
}
}
/// A single local pipeline validation issue.
@@ -277,7 +294,6 @@ impl LocalPipelineValidationService {
return self.validate_current_database(&config).await;
}
/// Diagnoses the current database with the `0.7.29` DEX matrix baseline profile.
pub async fn validate_v0_7_29_current_database(
&self,
@@ -293,6 +309,15 @@ impl LocalPipelineValidationService {
let config = crate::LocalPipelineValidationConfig::v0_7_30_non_trade_event_classification();
return self.validate_current_database(&config).await;
}
/// Diagnoses the current database with the `0.7.31` trade actionability profile.
pub async fn validate_v0_7_31_current_database(
&self,
) -> Result<crate::LocalPipelineValidationRunDto, crate::Error> {
let config =
crate::LocalPipelineValidationConfig::v0_7_31_trade_event_actionability_policy();
return self.validate_current_database(&config).await;
}
}
/// Validates a diagnostics summary without performing database access.
@@ -386,6 +411,24 @@ pub fn validate_local_pipeline_diagnostics_summary(
blocking: true,
});
}
if config.require_no_non_actionable_trade_events_materialized {
for classification_summary in &summary.event_classification_summaries {
if classification_summary.event_actionability != "trade_candidate"
&& classification_summary.trade_event_count > 0
{
issues.push(LocalPipelineValidationIssueDto {
code: "non_actionable_trade_events_materialized".to_string(),
message: format!(
"classification '{}' has {} linked trade event(s)",
classification_summary.event_actionability,
classification_summary.trade_event_count
),
subject: Some(classification_summary.event_actionability.clone()),
blocking: true,
});
}
}
}
if config.require_all_expected_dexes {
for expected_dex_code in &expected_dex_codes {
if !observed_dex_codes.contains(expected_dex_code) {
@@ -453,8 +496,7 @@ pub fn validate_local_pipeline_diagnostics_summary(
expected_dex_codes,
observed_dex_codes,
decoded_non_trade_useful_event_count: summary.decoded_non_trade_useful_event_count,
decoded_non_actionable_trade_event_count: summary
.decoded_non_actionable_trade_event_count,
decoded_non_actionable_trade_event_count: summary.decoded_non_actionable_trade_event_count,
decoded_unknown_event_count: summary.decoded_unknown_event_count,
dex_support_matrix_entry_count: crate::dex_support_matrix_entries().len() as i64,
dex_support_matrix: crate::dex_support_matrix_entry_dtos(),
@@ -670,6 +712,63 @@ mod tests {
assert_eq!(report.decoded_unknown_event_count, 0);
}
#[test]
fn validation_accepts_0_7_31_trade_actionability_policy_summary() {
let mut summary = make_0_7_28_summary_with_meteora();
summary.event_classification_summaries.push(
crate::LocalEventClassificationDiagnosticSummaryDto {
event_category: "trade".to_string(),
event_lifecycle_kind: "trade_swap".to_string(),
event_actionability: "failed_transaction".to_string(),
non_trade_useful: false,
event_count: 10,
decoded_trade_candidate_count: 0,
decoded_candle_candidate_count: 0,
trade_event_count: 0,
},
);
summary.event_classification_summaries.push(
crate::LocalEventClassificationDiagnosticSummaryDto {
event_category: "trade".to_string(),
event_lifecycle_kind: "trade_swap".to_string(),
event_actionability: "trade_candidate".to_string(),
non_trade_useful: false,
event_count: 12,
decoded_trade_candidate_count: 12,
decoded_candle_candidate_count: 12,
trade_event_count: 12,
},
);
let config =
crate::LocalPipelineValidationConfig::v0_7_31_trade_event_actionability_policy();
let report = crate::validate_local_pipeline_diagnostics_summary(&summary, &config);
assert!(report.validation_passed);
assert_eq!(report.validation_profile_code, "0.7.31_trade_event_actionability_policy");
}
#[test]
fn validation_rejects_0_7_31_failed_transaction_trade_events() {
let mut summary = make_0_7_28_summary_with_meteora();
summary.event_classification_summaries.push(
crate::LocalEventClassificationDiagnosticSummaryDto {
event_category: "trade".to_string(),
event_lifecycle_kind: "trade_swap".to_string(),
event_actionability: "failed_transaction".to_string(),
non_trade_useful: false,
event_count: 10,
decoded_trade_candidate_count: 0,
decoded_candle_candidate_count: 0,
trade_event_count: 2,
},
);
let config =
crate::LocalPipelineValidationConfig::v0_7_31_trade_event_actionability_policy();
let report = crate::validate_local_pipeline_diagnostics_summary(&summary, &config);
assert!(!report.validation_passed);
assert_eq!(report.blocking_issue_count, 1);
assert_eq!(report.issues[0].code, "non_actionable_trade_events_materialized");
}
#[test]
fn validation_report_exposes_dex_support_matrix() {
let summary = make_0_7_28_summary_with_meteora();

View File

@@ -47,6 +47,13 @@ impl TradeAggregationService {
Err(error) => return Err(error),
};
let transaction = transaction_context.transaction;
if transaction.err_json.is_some() {
tracing::debug!(
signature = %transaction.signature,
"skipping trade aggregation for failed transaction"
);
return Ok(std::vec::Vec::new());
}
let transaction_id = transaction_context.transaction_id;
let decoded_events = transaction_context.decoded_events;
let mut results = std::vec::Vec::new();
@@ -221,11 +228,12 @@ mod tests {
return std::sync::Arc::new(database);
}
async fn seed_fluxbeam_swap_transaction(
async fn seed_fluxbeam_swap_transaction_with_err(
database: std::sync::Arc<crate::Database>,
signature: &str,
base_amount_raw: &str,
quote_amount_raw: &str,
meta_err: serde_json::Value,
) {
let transaction_model = crate::TransactionModelService::new(database.clone());
let dex_decode = crate::DexDecodeService::new(database.clone());
@@ -263,7 +271,7 @@ mod tests {
}
},
"meta": {
"err": null,
"err": meta_err,
"logMessages": [
"Program log: Instruction: Swap",
"Program log: buy"
@@ -290,6 +298,38 @@ mod tests {
}
}
async fn seed_fluxbeam_swap_transaction(
database: std::sync::Arc<crate::Database>,
signature: &str,
base_amount_raw: &str,
quote_amount_raw: &str,
) {
seed_fluxbeam_swap_transaction_with_err(
database,
signature,
base_amount_raw,
quote_amount_raw,
serde_json::Value::Null,
)
.await;
}
async fn seed_failed_fluxbeam_swap_transaction(
database: std::sync::Arc<crate::Database>,
signature: &str,
base_amount_raw: &str,
quote_amount_raw: &str,
) {
seed_fluxbeam_swap_transaction_with_err(
database,
signature,
base_amount_raw,
quote_amount_raw,
serde_json::json!({ "InstructionError": [0, { "Custom": 1 }] }),
)
.await;
}
#[tokio::test]
async fn record_transaction_by_signature_creates_trade_event_and_pair_metric() {
let database = make_database().await;
@@ -375,4 +415,49 @@ mod tests {
};
assert_eq!(pair_metric.trade_count, 1);
}
#[tokio::test]
async fn record_transaction_by_signature_skips_failed_transaction() {
let database = make_database().await;
seed_failed_fluxbeam_swap_transaction(
database.clone(),
"sig-trade-aggregation-failed-1",
"1000",
"2500",
)
.await;
let transaction_result = crate::query_chain_transactions_get_by_signature(
database.as_ref(),
"sig-trade-aggregation-failed-1",
)
.await;
let transaction_option = match transaction_result {
Ok(transaction_option) => transaction_option,
Err(error) => panic!("transaction fetch must succeed: {}", error),
};
let transaction = match transaction_option {
Some(transaction) => transaction,
None => panic!("transaction must exist"),
};
let transaction_id = match transaction.id {
Some(transaction_id) => transaction_id,
None => panic!("transaction id must exist"),
};
let service = crate::TradeAggregationService::new(database.clone());
let record_result =
service.record_transaction_by_signature("sig-trade-aggregation-failed-1").await;
let results = match record_result {
Ok(results) => results,
Err(error) => panic!("failed transaction aggregation must not fail: {}", error),
};
assert_eq!(results.len(), 0);
let trade_events_result =
crate::query_trade_events_list_by_transaction_id(database.as_ref(), transaction_id)
.await;
let trade_events = match trade_events_result {
Ok(trade_events) => trade_events,
Err(error) => panic!("trade event list must succeed: {}", error),
};
assert_eq!(trade_events.len(), 0);
}
}