diff --git a/CHANGELOG.md b/CHANGELOG.md
index d0ea17e..be3ffad 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -61,3 +61,4 @@
0.7.28 - Refactor DEX commun et verrouillage des invariants de normalisation : séparation des événements décodés, actionnables, trade candidates et candle candidates ; conservation des transactions failed comme traçables mais non actionnables ; ajout de la règle bloquante empêchant tout trade/candle candidate sans payload de montants exploitable, notamment pour le cas partiel `meteora_damm_v1.swap` sans base/quote amount.
0.7.29 - Ajout d’une matrice DEX commune (`dex_support_matrix`) utilisée par le catalogue DEX, la classification transactionnelle et l’enregistrement des protocol candidates ; ajout du profil de validation `0.7.29_multi_dex_matrix_baseline` exposant la matrice dans le rapport de validation ; préparation explicite des surfaces planifiées sans inventer de program ids non vérifiés.
0.7.30 - Ajout d’une taxonomie DEX plus fine pour les événements décodés : `eventLifecycleKind`, `eventActionability`, `nonTradeUseful`, compteurs diagnostics des événements non-trade utiles, trades non actionnables et classifications inconnues ; ajout du profil `0.7.30_non_trade_event_classification` sans modification volontaire de la matérialisation trade/candle.
+0.7.31 - Application de la politique Option B : les transactions failed restent traçables dans les événements décodés mais ne peuvent plus alimenter `trade_events`, metrics ou candles ; le replay local réinitialise les tables de matérialisation marché avant reconstruction pour supprimer les anciennes lignes dérivées non actionnables.
diff --git a/Cargo.toml b/Cargo.toml
index 8e9a43f..f094054 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,7 +8,7 @@ members = [
]
[workspace.package]
-version = "0.7.30"
+version = "0.7.31"
edition = "2024"
license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot"
diff --git a/README.md b/README.md
index 3466443..12a55fe 100644
--- a/README.md
+++ b/README.md
@@ -215,7 +215,7 @@ Les tests peuvent rester plus souples lorsque cela clarifie le test.
La reprise doit suivre cet ordre :
-1. conserver la non-régression `0.7.28` : transactions failed traçables mais non actionnables, aucun trade/candle candidate sans montant exploitable ;
+1. conserver la non-régression `0.7.31` : transactions failed traçables mais exclues des `trade_events`, metrics et candles ;
2. utiliser la matrice `0.7.29` comme source commune pour le catalogue, la classification et les protocol candidates ;
3. relier progressivement les événements non-trade aux tables existantes : lifecycle, liquidité, fees, rewards, admin ;
4. consolider Meteora, surtout `meteora_dlmm` et le cas partiel `meteora_damm_v1` ;
diff --git a/ROADMAP.md b/ROADMAP.md
index 9b59866..4fbcbcf 100644
--- a/ROADMAP.md
+++ b/ROADMAP.md
@@ -826,9 +826,7 @@ Matrice cible initiale :
| `zora` | à vérifier | à vérifier | hors phasage actif avant preuve Solana |
### 6.062. Version `0.7.30` — Classification fine des événements DEX décodés
-Objectif : préparer les événements non-trade utiles sans modifier la matérialisation trade/candle validée.
-
-Fait / à valider :
+Réalisé :
- ajouter `DexEventLifecycleKind` pour distinguer `trade_swap`, `pool_creation`, `pair_creation`, `liquidity_add`, `liquidity_remove`, `position_open`, `position_close`, `migration`, `launch`, `mint`, `burn`, `fee_collection`, `reward`, `admin_config` et `unknown`,
- ajouter `DexEventActionability` pour distinguer `trade_candidate`, `non_actionable_trade`, `non_trade_useful`, `failed_transaction`, `informational` et `unknown`,
@@ -838,7 +836,17 @@ Fait / à valider :
- ajouter le profil `0.7.30_non_trade_event_classification`,
- ne pas matérialiser encore les événements non-trade dans leurs tables dédiées.
-### 6.063. Version `0.7.31` — Transactions inconnues et protocol candidates
+### 6.063. Version `0.7.31` — Politique de matérialisation des failed transactions
+Réalisé :
+
+- empêcher `TradeAggregationService` de matérialiser une transaction dont `err_json` est renseigné ;
+- conserver les événements DEX décodés des failed transactions pour audit et diagnostic ;
+- réinitialiser les tables dérivées de marché pendant le replay local : `k_sol_trade_events`, `k_sol_pair_metrics`, `k_sol_pair_candles`, `k_sol_pair_analytic_signals` ;
+- reconstruire ensuite les trades/candles uniquement à partir des événements `tradeCandidate=true` et de transactions OK ;
+- exposer `resetMarketMaterializationDeletedCount` dans le résultat de replay UI ;
+- conserver la validation multi-DEX et la matrice DEX comme garde-fous avant d’ajouter les surfaces restantes.
+
+### 6.064. Version `0.7.32` — Transactions inconnues et protocol candidates
Objectif : ne plus perdre les transactions utiles qui ne correspondent pas encore à un DEX connu.
À faire :
@@ -851,7 +859,7 @@ Objectif : ne plus perdre les transactions utiles qui ne correspondent pas encor
- permettre de promouvoir plus tard un protocol candidate vers un vrai DEX/surface sans perdre l’historique,
- garantir que ces tables n’alimentent jamais directement les trades/candles.
-### 6.064. Version `0.7.32` — Événements non-trade v1 : liquidité et cycle de vie pool
+### 6.065. Version `0.7.33` — Événements non-trade v1 : liquidité et cycle de vie pool
Objectif : exploiter les événements utiles à l’analyse et au trading semi-automatique sans les mélanger avec les swaps/candles.
À faire :
@@ -864,7 +872,7 @@ Objectif : exploiter les événements utiles à l’analyse et au trading semi-a
- alimenter les diagnostics locaux avec les compteurs liquidité/lifecycle,
- garantir qu’un événement de liquidité ou de cycle de vie ne produit jamais de candle directement.
-### 6.065. Version `0.7.33` — Événements non-trade v2 : fees, rewards et administration
+### 6.066. Version `0.7.34` — Événements non-trade v2 : fees, rewards et administration
Objectif : conserver les événements utiles au risque, au scoring, à l’économie du pool et à la traçabilité opérationnelle.
À faire :
@@ -878,7 +886,7 @@ Objectif : conserver les événements utiles au risque, au scoring, à l’écon
- rattacher ces événements aux transactions, decoded events, pools, paires et wallets observés lorsque les comptes le permettent,
- documenter clairement que ces événements ne sont ni des trades ni des candles.
-### 6.066. Version `0.7.34` — Meteora : DBC / DAMM v1 / DAMM v2 / DLMM
+### 6.067. Version `0.7.35` — Meteora : DBC / DAMM v1 / DAMM v2 / DLMM
Objectif : consolider Meteora comme famille multi-programmes au lieu de traiter chaque variante comme un cas isolé incomplet.
À faire :
@@ -892,7 +900,7 @@ Objectif : consolider Meteora comme famille multi-programmes au lieu de traiter
- vérifier l’idempotence du replay local sur un corpus Meteora mixte,
- documenter les limites connues des variantes insuffisamment couvertes.
-### 6.066. Version `0.7.34` — Launch surfaces : LaunchLab, LetsBonk, Bags, Moonshot/Moonit, Boop.fun, Believe
+### 6.068. Version `0.7.36` — Launch surfaces : LaunchLab, LetsBonk, Bags, Moonshot/Moonit, Boop.fun, Believe
Objectif : détecter la première source de mint/lancement des tokens même lorsque le swap final se fait ailleurs.
À faire :
@@ -907,7 +915,7 @@ Objectif : détecter la première source de mint/lancement des tokens même lors
- rattacher les launch origins aux pools et paires lorsque les comptes permettent un matching fiable,
- exposer les origins dans les diagnostics et l’UI d’inspection.
-### 6.067. Version `0.7.35` — Heaven : corpus, launch et AMM
+### 6.069. Version `0.7.37` — Heaven : corpus, launch et AMM
Objectif : ajouter Heaven sans le classer trop tôt comme simple DEX ou simple launchpad.
À faire :
@@ -919,7 +927,7 @@ Objectif : ajouter Heaven sans le classer trop tôt comme simple DEX ou simple l
- documenter les limites si le corpus ne permet pas encore de matérialiser tous les événements,
- vérifier que Heaven ne crée pas de candles invalides en cas d’événement de launch non pricé.
-### 6.068. Version `0.7.36` — Orca / FluxBeam / DexLab : corpus et validation ciblée
+### 6.070. Version `0.7.38` — Orca / FluxBeam / DexLab : corpus et validation ciblée
Objectif : consolider les connecteurs déjà présents à partir de corpus locaux vérifiables.
À faire :
@@ -931,7 +939,7 @@ Objectif : consolider les connecteurs déjà présents à partir de corpus locau
- marquer explicitement les variantes partiellement supportées ou heuristiques,
- rejouer les corpus plusieurs fois pour vérifier l’idempotence et l’absence de trades/candles invalides.
-### 6.069. Version `0.7.37` — Raydium AMM v4 legacy : corpus et validation ciblée
+### 6.071. Version `0.7.39` — Raydium AMM v4 legacy : corpus et validation ciblée
Objectif : traiter le vrai Raydium AMM v4 historique après les autres Raydium, afin de l’isoler de `raydium_cpmm`, `raydium_clmm` et des labels Raydium génériques.
À faire :
@@ -944,7 +952,7 @@ Objectif : traiter le vrai Raydium AMM v4 historique après les autres Raydium,
- renommer/stabiliser les fonctions internes autour de `raydium_amm_v4` pour éviter l’ambiguïté avec `raydium_cpmm` et `raydium_clmm`,
- documenter les limites connues si le corpus AMM v4 reste faible.
-### 6.070. Version `0.7.38` — Validation DEX v1 consolidée
+### 6.072. Version `0.7.40` — Validation DEX v1 consolidée
Objectif : rejouer tous les DEX et launch surfaces supportés et valider les invariants du pipeline complet.
À faire :
@@ -957,7 +965,7 @@ Objectif : rejouer tous les DEX et launch surfaces supportés et valider les inv
- conserver une matrice de support par DEX, variante, instruction et type d’événement,
- verrouiller les invariants avant d’ouvrir l’analyse `0.8.x`.
-### 6.071. Version `0.7.39` — `kb_demo_app` : overlays analytiques
+### 6.073. Version `0.7.41` — `kb_demo_app` : overlays analytiques
Objectif : rendre visibles les signaux analytiques directement sur les graphes et vues de marché.
À faire :
@@ -968,7 +976,7 @@ Objectif : rendre visibles les signaux analytiques directement sur les graphes e
- afficher un panneau latéral listant les signaux liés à une paire et à un timeframe,
- préparer l’extension future vers Ichimoku, Kumo, projections ABCD et égalités temps/prix sans les mélanger au pipeline de décodage DEX.
-### 6.072. Version `0.7.40` — `kb_demo_app` : vues consolidées token / pair / pool
+### 6.074. Version `0.7.42` — `kb_demo_app` : vues consolidées token / pair / pool
Objectif : fournir une lecture métier plus confortable du modèle `0.7.x`.
À faire :
@@ -980,7 +988,7 @@ Objectif : fournir une lecture métier plus confortable du modèle `0.7.x`.
- préparer une navigation transversale entre objets techniques et objets métier,
- rendre explicites les cas `tradeCount = null`, `lastPriceQuotePerBase = null`, tokens non enrichis et événements conservés uniquement pour analyse.
-### 6.073. Version `0.7.41` — Finition UI `0.7.x`
+### 6.075. Version `0.7.43` — Finition UI `0.7.x`
Objectif : stabiliser la couche desktop de validation avant l’ouverture de `0.8.x`.
À faire :
@@ -991,7 +999,7 @@ Objectif : stabiliser la couche desktop de validation avant l’ouverture de `0.
- préparer une base UI suffisamment stable pour la future phase d’analyse et filtrage `0.8.x`,
- vérifier que les commandes Tauri restent de simples façades vers `kb_lib`.
-### 6.074. Version `0.7.x` — Couverture DEX v1
+### 6.076. Version `0.7.x` — Couverture DEX v1
Objectif : structurer les connecteurs DEX autour d’un pipeline complet de résolution, décodage, normalisation métier et classification des événements non-trade.
Protocoles et surfaces cibles :
@@ -1034,7 +1042,7 @@ Résultat attendu :
- préparation d’une détection temps réel hybride et d’un backfill ciblé compatible avec les mêmes objets métier,
- préparation d’agrégats DEX plus riches, de candles/OHLCV et d’une UI d’inspection du pipeline `0.7.x`.
-### 6.075. Version `0.8.x` — Analyse et filtrage
+### 6.077. Version `0.8.x` — Analyse et filtrage
Objectif : transformer les événements bruts en signaux exploitables.
À faire :
@@ -1049,7 +1057,7 @@ Objectif : transformer les événements bruts en signaux exploitables.
- outils de sélection manuelle de points ABC et projection d’un point D selon des règles temps/prix explicites,
- séparation stricte entre signaux analytiques observés, projections hypothétiques et décisions de trading.
-### 6.076. Version `1.x.y` — Wallets et swap préparatoire
+### 6.078. Version `1.x.y` — Wallets et swap préparatoire
Objectif : préparer la couche d’action.
À faire :
@@ -1060,7 +1068,7 @@ Objectif : préparer la couche d’action.
- préparation d’ordres et de swaps,
- simulation et garde-fous.
-### 6.077. Version `2.x.y` — Trading semi-automatisé
+### 6.079. Version `2.x.y` — Trading semi-automatisé
Objectif : brancher l’analyse à l’action tout en gardant des garde-fous explicites.
À faire :
@@ -1071,7 +1079,7 @@ Objectif : brancher l’analyse à l’action tout en gardant des garde-fous exp
- confirmations explicites ou semi-automatiques,
- journaux d’exécution.
-### 6.078. Version `3.x.y` — Yellowstone gRPC
+### 6.080. Version `3.x.y` — Yellowstone gRPC
Objectif : ajouter le connecteur gRPC dédié.
À faire :
@@ -1187,7 +1195,7 @@ Réalisé / à maintenir :
Validé en `0.7.30` : classification fine des événements décodés via `eventLifecycleKind`, `eventActionability` et `nonTradeUseful`, avec diagnostics associés et sans changement volontaire sur les trades/candles.
-À poursuivre en `0.7.31` : transactions inconnues et protocol candidates ; puis en `0.7.32` : matérialisation contrôlée des événements non-trade utiles.
+À poursuivre après `0.7.31` : transactions inconnues/protocol candidates, puis matérialisation contrôlée des événements non-trade utiles, sans alimenter les trades/candles actionnables.
## 11. Documentation et livrables de référence
Le projet doit maintenir au minimum :
@@ -1203,7 +1211,7 @@ Le projet doit maintenir au minimum :
La priorité immédiate est désormais la suivante :
-1. conserver la validation acquise `0.7.28` : transactions failed traçables mais non actionnables, aucun trade/candle candidate sans payload montant/prix exploitable, aucun diagnostic bloquant masqué,
+1. conserver la validation acquise `0.7.31` : transactions failed traçables mais exclues des `trade_events`, metrics et candles, aucun trade/candle candidate sans payload montant/prix exploitable, aucun diagnostic bloquant masqué,
2. utiliser la matrice `0.7.29` (`kb_lib/src/dex_support_matrix.rs`) comme source commune pour le catalogue DEX, les mappings program id -> protocole, la classification transactionnelle et les protocol candidates,
3. garder les clients HTTP/WS et managers réseau hors du refactor DEX tant qu’ils ne bloquent pas le pipeline,
4. consolider les événements non-trade sans les confondre avec les trades/candles : lifecycle de pool, liquidité, fees, rewards, admin/config, migration et launch/mint,
diff --git a/kb_demo_app/frontend/demo_pipeline2.html b/kb_demo_app/frontend/demo_pipeline2.html
index 45273f0..7bf1936 100644
--- a/kb_demo_app/frontend/demo_pipeline2.html
+++ b/kb_demo_app/frontend/demo_pipeline2.html
@@ -166,10 +166,11 @@
diff --git a/kb_demo_app/frontend/ts/demo_pipeline2.ts b/kb_demo_app/frontend/ts/demo_pipeline2.ts
index 6502f10..9888d74 100644
--- a/kb_demo_app/frontend/ts/demo_pipeline2.ts
+++ b/kb_demo_app/frontend/ts/demo_pipeline2.ts
@@ -61,6 +61,7 @@ interface LocalPipelineReplayResult {
analyticSignalUpsertCount: number;
tokenMetadataUpdatedCount: number;
pairSymbolUpdatedCount: number;
+ resetMarketMaterializationDeletedCount: number;
globalErrorCount: number;
}
function appendLogLine(textarea: HTMLTextAreaElement, line: string): void {
@@ -617,7 +618,7 @@ document.addEventListener("DOMContentLoaded", async () => {
appendLogLine(
logTextarea,
- `[ui] local pipeline replay completed: ${result.replayedTransactionCount.toString()} replayed, ${result.tradeEventCount.toString()} trades, ${result.pairCandleUpsertCount.toString()} candle upserts`,
+ `[ui] local pipeline replay completed: ${result.replayedTransactionCount.toString()} replayed, ${result.tradeEventCount.toString()} trades, ${result.pairCandleUpsertCount.toString()} candle upserts, resetDeleted='${result.resetMarketMaterializationDeletedCount.toString()}'`,
);
await refreshCatalog();
diff --git a/kb_demo_app/package.json b/kb_demo_app/package.json
index de84e7c..9cc9dd5 100644
--- a/kb_demo_app/package.json
+++ b/kb_demo_app/package.json
@@ -1,7 +1,7 @@
{
"name": "kb-demo-app",
"private": true,
- "version": "0.7.29",
+ "version": "0.7.31",
"type": "module",
"scripts": {
"dev": "vite",
diff --git a/kb_demo_app/src/demo_pipeline2.rs b/kb_demo_app/src/demo_pipeline2.rs
index a116f5c..1ad418b 100644
--- a/kb_demo_app/src/demo_pipeline2.rs
+++ b/kb_demo_app/src/demo_pipeline2.rs
@@ -971,7 +971,7 @@ pub(crate) async fn demo_pipeline2_validate_local_pipeline(
let service = kb_lib::LocalPipelineValidationService::new(database.clone());
let profile_code = match request {
Some(request) => request.profile_code,
- None => "0.7.30_non_trade_event_classification".to_string(),
+ None => "0.7.31_trade_event_actionability_policy".to_string(),
};
let run_result = match profile_code.as_str() {
"0.7.27" | "0.7.27_dexes_non_regression" => {
@@ -986,6 +986,9 @@ pub(crate) async fn demo_pipeline2_validate_local_pipeline(
"0.7.30" | "0.7.30_non_trade_event_classification" => {
service.validate_v0_7_30_current_database().await
},
+ "0.7.31" | "0.7.31_trade_event_actionability_policy" => {
+ service.validate_v0_7_31_current_database().await
+ },
other => Err(kb_lib::Error::InvalidState(format!(
"unsupported local pipeline validation profile: {other}"
))),
@@ -1356,6 +1359,7 @@ pub(crate) async fn demo_pipeline2_replay_local_pipeline(
limit,
refresh_missing_token_metadata,
token_metadata_limit,
+ reset_market_materialization_before_replay: true,
};
let database = state.database.clone();
let service = if refresh_missing_token_metadata {
diff --git a/kb_demo_app/tauri.conf.json b/kb_demo_app/tauri.conf.json
index fcd9877..77cee40 100644
--- a/kb_demo_app/tauri.conf.json
+++ b/kb_demo_app/tauri.conf.json
@@ -1,7 +1,7 @@
{
"$schema": "https://schema.tauri.app/config/2",
"productName": "kb-demo-app",
- "version": "0.7.29",
+ "version": "0.7.31",
"identifier": "com.sasedev.kb-demo-app",
"build": {
"beforeDevCommand": "npm run dev",
diff --git a/kb_lib/src/db.rs b/kb_lib/src/db.rs
index 128bead..9214ffa 100644
--- a/kb_lib/src/db.rs
+++ b/kb_lib/src/db.rs
@@ -207,6 +207,7 @@ pub use queries::query_trade_events_get_by_decoded_event_id;
pub use queries::query_trade_events_list_by_pair_id;
pub use queries::query_trade_events_list_by_transaction_id;
pub use queries::query_trade_events_upsert;
+pub use queries::query_trade_market_materialization_delete_all;
pub use queries::query_transaction_classifications_get_by_signature;
pub use queries::query_transaction_classifications_get_by_transaction_id;
pub use queries::query_transaction_classifications_list_recent;
diff --git a/kb_lib/src/db/queries.rs b/kb_lib/src/db/queries.rs
index ebe5492..8f3ac91 100644
--- a/kb_lib/src/db/queries.rs
+++ b/kb_lib/src/db/queries.rs
@@ -145,6 +145,7 @@ pub use trade_event::query_trade_events_get_by_decoded_event_id;
pub use trade_event::query_trade_events_list_by_pair_id;
pub use trade_event::query_trade_events_list_by_transaction_id;
pub use trade_event::query_trade_events_upsert;
+pub use trade_event::query_trade_market_materialization_delete_all;
pub use transaction_classification::query_transaction_classifications_get_by_signature;
pub use transaction_classification::query_transaction_classifications_get_by_transaction_id;
pub use transaction_classification::query_transaction_classifications_list_recent;
diff --git a/kb_lib/src/db/queries/trade_event.rs b/kb_lib/src/db/queries/trade_event.rs
index 60bc25a..0050c95 100644
--- a/kb_lib/src/db/queries/trade_event.rs
+++ b/kb_lib/src/db/queries/trade_event.rs
@@ -2,6 +2,41 @@
//! Queries for `k_sol_trade_events`.
+/// Deletes all local market materialization rows that are rebuilt from trade events.
+///
+/// The local replay pipeline is deterministic over persisted raw transactions. Clearing
+/// these derived tables before replay prevents stale failed-transaction trades, metrics
+/// or candles from surviving after actionability rules change.
+pub async fn query_trade_market_materialization_delete_all(
+ database: &crate::Database,
+) -> Result {
+ match database.connection() {
+ crate::DatabaseConnection::Sqlite(pool) => {
+ let statements = [
+ ("k_sol_pair_analytic_signals", "DELETE FROM k_sol_pair_analytic_signals"),
+ ("k_sol_pair_candles", "DELETE FROM k_sol_pair_candles"),
+ ("k_sol_pair_metrics", "DELETE FROM k_sol_pair_metrics"),
+ ("k_sol_trade_events", "DELETE FROM k_sol_trade_events"),
+ ];
+ let mut deleted_count = 0_u64;
+ for (table_name, statement) in statements {
+ let query_result = sqlx::query(statement).execute(pool).await;
+ let result = match query_result {
+ Ok(result) => result,
+ Err(error) => {
+ return Err(crate::Error::Db(format!(
+ "cannot clear {} during local market materialization reset on sqlite: {}",
+ table_name, error
+ )));
+ },
+ };
+ deleted_count += result.rows_affected();
+ }
+ return Ok(deleted_count);
+ },
+ }
+}
+
/// Inserts or updates one trade-event row and returns its stable internal id.
pub async fn query_trade_events_upsert(
database: &crate::Database,
diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs
index ac3e10d..07bc89d 100644
--- a/kb_lib/src/lib.rs
+++ b/kb_lib/src/lib.rs
@@ -687,6 +687,8 @@ pub use db::query_trade_events_list_by_pair_id;
pub use db::query_trade_events_list_by_transaction_id;
/// Inserts or updates one trade-event row and returns its stable internal id.
pub use db::query_trade_events_upsert;
+/// Deletes all local market materialization rows rebuilt from trade events.
+pub use db::query_trade_market_materialization_delete_all;
/// Reads one transaction classification by signature.
pub use db::query_transaction_classifications_get_by_signature;
/// Reads one transaction classification by transaction id.
diff --git a/kb_lib/src/local_pipeline_replay.rs b/kb_lib/src/local_pipeline_replay.rs
index 80cfbd1..cf4b331 100644
--- a/kb_lib/src/local_pipeline_replay.rs
+++ b/kb_lib/src/local_pipeline_replay.rs
@@ -16,6 +16,8 @@ pub struct LocalPipelineReplayConfig {
pub refresh_missing_token_metadata: bool,
/// Maximum number of missing token metadata rows to resolve.
pub token_metadata_limit: std::option::Option,
+ /// Whether locally replayed market materialization tables are reset before replay.
+ pub reset_market_materialization_before_replay: bool,
}
impl Default for LocalPipelineReplayConfig {
@@ -24,6 +26,7 @@ impl Default for LocalPipelineReplayConfig {
limit: Some(10_000),
refresh_missing_token_metadata: false,
token_metadata_limit: Some(250),
+ reset_market_materialization_before_replay: true,
};
}
}
@@ -71,6 +74,8 @@ pub struct LocalPipelineReplayResult {
pub token_metadata_updated_count: usize,
/// Number of pair symbols updated after replay.
pub pair_symbol_updated_count: usize,
+ /// Number of derived market materialization rows deleted before replay.
+ pub reset_market_materialization_deleted_count: u64,
/// Number of errors outside per-signature replay.
pub global_error_count: usize,
}
@@ -120,6 +125,19 @@ impl LocalPipelineReplayService {
Ok(signatures) => signatures,
Err(error) => return Err(error),
};
+ let mut reset_market_materialization_deleted_count = 0_u64;
+ if config.reset_market_materialization_before_replay {
+ let reset_result =
+ crate::query_trade_market_materialization_delete_all(self.database.as_ref()).await;
+ reset_market_materialization_deleted_count = match reset_result {
+ Ok(deleted_count) => deleted_count,
+ Err(error) => return Err(error),
+ };
+ tracing::debug!(
+ deleted_count = reset_market_materialization_deleted_count,
+ "local pipeline replay reset market materialization tables"
+ );
+ }
let dex_decode = crate::DexDecodeService::new(self.database.clone());
let dex_detect = crate::DexDetectService::new(self.database.clone());
let trade_aggregation = crate::TradeAggregationService::new(self.database.clone());
@@ -130,6 +148,7 @@ impl LocalPipelineReplayService {
crate::TransactionClassificationService::new(self.database.clone());
let mut result = LocalPipelineReplayResult {
selected_transaction_count: signatures.len(),
+ reset_market_materialization_deleted_count,
..Default::default()
};
for signature in signatures {
diff --git a/kb_lib/src/local_pipeline_validation.rs b/kb_lib/src/local_pipeline_validation.rs
index 793665e..cd8701f 100644
--- a/kb_lib/src/local_pipeline_validation.rs
+++ b/kb_lib/src/local_pipeline_validation.rs
@@ -38,6 +38,8 @@ pub struct LocalPipelineValidationConfig {
pub require_trade_events_per_dex: bool,
/// Whether each DEX summary must contain at least one candle.
pub require_candles_per_dex: bool,
+ /// Whether non-actionable classification groups must have no linked trade events.
+ pub require_no_non_actionable_trade_events_materialized: bool,
}
impl Default for LocalPipelineValidationConfig {
@@ -56,6 +58,7 @@ impl Default for LocalPipelineValidationConfig {
require_decoded_events_per_dex: true,
require_trade_events_per_dex: true,
require_candles_per_dex: true,
+ require_no_non_actionable_trade_events_materialized: true,
};
}
}
@@ -82,6 +85,7 @@ impl LocalPipelineValidationConfig {
require_decoded_events_per_dex: true,
require_trade_events_per_dex: true,
require_candles_per_dex: true,
+ require_no_non_actionable_trade_events_materialized: true,
};
}
@@ -113,10 +117,10 @@ impl LocalPipelineValidationConfig {
require_decoded_events_per_dex: true,
require_trade_events_per_dex: false,
require_candles_per_dex: false,
+ require_no_non_actionable_trade_events_materialized: true,
};
}
-
/// Builds the `0.7.29` DEX support matrix baseline validation config.
///
/// This profile preserves the `0.7.28` trade/candle non-regression checks
@@ -144,6 +148,7 @@ impl LocalPipelineValidationConfig {
require_decoded_events_per_dex: true,
require_trade_events_per_dex: false,
require_candles_per_dex: false,
+ require_no_non_actionable_trade_events_materialized: true,
};
}
@@ -157,6 +162,18 @@ impl LocalPipelineValidationConfig {
config.profile_code = "0.7.30_non_trade_event_classification".to_string();
return config;
}
+
+ /// Builds the `0.7.31` trade-event actionability validation config.
+ ///
+ /// This profile keeps the `0.7.30` classification checks and additionally
+ /// requires failed/non-actionable classification groups to have no linked
+ /// persisted trade events.
+ pub fn v0_7_31_trade_event_actionability_policy() -> Self {
+ let mut config = Self::v0_7_30_non_trade_event_classification();
+ config.profile_code = "0.7.31_trade_event_actionability_policy".to_string();
+ config.require_no_non_actionable_trade_events_materialized = true;
+ return config;
+ }
}
/// A single local pipeline validation issue.
@@ -277,7 +294,6 @@ impl LocalPipelineValidationService {
return self.validate_current_database(&config).await;
}
-
/// Diagnoses the current database with the `0.7.29` DEX matrix baseline profile.
pub async fn validate_v0_7_29_current_database(
&self,
@@ -293,6 +309,15 @@ impl LocalPipelineValidationService {
let config = crate::LocalPipelineValidationConfig::v0_7_30_non_trade_event_classification();
return self.validate_current_database(&config).await;
}
+
+ /// Diagnoses the current database with the `0.7.31` trade actionability profile.
+ pub async fn validate_v0_7_31_current_database(
+ &self,
+ ) -> Result {
+ let config =
+ crate::LocalPipelineValidationConfig::v0_7_31_trade_event_actionability_policy();
+ return self.validate_current_database(&config).await;
+ }
}
/// Validates a diagnostics summary without performing database access.
@@ -386,6 +411,24 @@ pub fn validate_local_pipeline_diagnostics_summary(
blocking: true,
});
}
+ if config.require_no_non_actionable_trade_events_materialized {
+ for classification_summary in &summary.event_classification_summaries {
+ if classification_summary.event_actionability != "trade_candidate"
+ && classification_summary.trade_event_count > 0
+ {
+ issues.push(LocalPipelineValidationIssueDto {
+ code: "non_actionable_trade_events_materialized".to_string(),
+ message: format!(
+ "classification '{}' has {} linked trade event(s)",
+ classification_summary.event_actionability,
+ classification_summary.trade_event_count
+ ),
+ subject: Some(classification_summary.event_actionability.clone()),
+ blocking: true,
+ });
+ }
+ }
+ }
if config.require_all_expected_dexes {
for expected_dex_code in &expected_dex_codes {
if !observed_dex_codes.contains(expected_dex_code) {
@@ -453,8 +496,7 @@ pub fn validate_local_pipeline_diagnostics_summary(
expected_dex_codes,
observed_dex_codes,
decoded_non_trade_useful_event_count: summary.decoded_non_trade_useful_event_count,
- decoded_non_actionable_trade_event_count: summary
- .decoded_non_actionable_trade_event_count,
+ decoded_non_actionable_trade_event_count: summary.decoded_non_actionable_trade_event_count,
decoded_unknown_event_count: summary.decoded_unknown_event_count,
dex_support_matrix_entry_count: crate::dex_support_matrix_entries().len() as i64,
dex_support_matrix: crate::dex_support_matrix_entry_dtos(),
@@ -670,6 +712,63 @@ mod tests {
assert_eq!(report.decoded_unknown_event_count, 0);
}
+ #[test]
+ fn validation_accepts_0_7_31_trade_actionability_policy_summary() {
+ let mut summary = make_0_7_28_summary_with_meteora();
+ summary.event_classification_summaries.push(
+ crate::LocalEventClassificationDiagnosticSummaryDto {
+ event_category: "trade".to_string(),
+ event_lifecycle_kind: "trade_swap".to_string(),
+ event_actionability: "failed_transaction".to_string(),
+ non_trade_useful: false,
+ event_count: 10,
+ decoded_trade_candidate_count: 0,
+ decoded_candle_candidate_count: 0,
+ trade_event_count: 0,
+ },
+ );
+ summary.event_classification_summaries.push(
+ crate::LocalEventClassificationDiagnosticSummaryDto {
+ event_category: "trade".to_string(),
+ event_lifecycle_kind: "trade_swap".to_string(),
+ event_actionability: "trade_candidate".to_string(),
+ non_trade_useful: false,
+ event_count: 12,
+ decoded_trade_candidate_count: 12,
+ decoded_candle_candidate_count: 12,
+ trade_event_count: 12,
+ },
+ );
+ let config =
+ crate::LocalPipelineValidationConfig::v0_7_31_trade_event_actionability_policy();
+ let report = crate::validate_local_pipeline_diagnostics_summary(&summary, &config);
+ assert!(report.validation_passed);
+ assert_eq!(report.validation_profile_code, "0.7.31_trade_event_actionability_policy");
+ }
+
+ #[test]
+ fn validation_rejects_0_7_31_failed_transaction_trade_events() {
+ let mut summary = make_0_7_28_summary_with_meteora();
+ summary.event_classification_summaries.push(
+ crate::LocalEventClassificationDiagnosticSummaryDto {
+ event_category: "trade".to_string(),
+ event_lifecycle_kind: "trade_swap".to_string(),
+ event_actionability: "failed_transaction".to_string(),
+ non_trade_useful: false,
+ event_count: 10,
+ decoded_trade_candidate_count: 0,
+ decoded_candle_candidate_count: 0,
+ trade_event_count: 2,
+ },
+ );
+ let config =
+ crate::LocalPipelineValidationConfig::v0_7_31_trade_event_actionability_policy();
+ let report = crate::validate_local_pipeline_diagnostics_summary(&summary, &config);
+ assert!(!report.validation_passed);
+ assert_eq!(report.blocking_issue_count, 1);
+ assert_eq!(report.issues[0].code, "non_actionable_trade_events_materialized");
+ }
+
#[test]
fn validation_report_exposes_dex_support_matrix() {
let summary = make_0_7_28_summary_with_meteora();
diff --git a/kb_lib/src/trade_aggregation.rs b/kb_lib/src/trade_aggregation.rs
index 4905a1b..48a23da 100644
--- a/kb_lib/src/trade_aggregation.rs
+++ b/kb_lib/src/trade_aggregation.rs
@@ -47,6 +47,13 @@ impl TradeAggregationService {
Err(error) => return Err(error),
};
let transaction = transaction_context.transaction;
+ if transaction.err_json.is_some() {
+ tracing::debug!(
+ signature = %transaction.signature,
+ "skipping trade aggregation for failed transaction"
+ );
+ return Ok(std::vec::Vec::new());
+ }
let transaction_id = transaction_context.transaction_id;
let decoded_events = transaction_context.decoded_events;
let mut results = std::vec::Vec::new();
@@ -221,11 +228,12 @@ mod tests {
return std::sync::Arc::new(database);
}
- async fn seed_fluxbeam_swap_transaction(
+ async fn seed_fluxbeam_swap_transaction_with_err(
database: std::sync::Arc,
signature: &str,
base_amount_raw: &str,
quote_amount_raw: &str,
+ meta_err: serde_json::Value,
) {
let transaction_model = crate::TransactionModelService::new(database.clone());
let dex_decode = crate::DexDecodeService::new(database.clone());
@@ -263,7 +271,7 @@ mod tests {
}
},
"meta": {
- "err": null,
+ "err": meta_err,
"logMessages": [
"Program log: Instruction: Swap",
"Program log: buy"
@@ -290,6 +298,38 @@ mod tests {
}
}
+ async fn seed_fluxbeam_swap_transaction(
+ database: std::sync::Arc,
+ signature: &str,
+ base_amount_raw: &str,
+ quote_amount_raw: &str,
+ ) {
+ seed_fluxbeam_swap_transaction_with_err(
+ database,
+ signature,
+ base_amount_raw,
+ quote_amount_raw,
+ serde_json::Value::Null,
+ )
+ .await;
+ }
+
+ async fn seed_failed_fluxbeam_swap_transaction(
+ database: std::sync::Arc,
+ signature: &str,
+ base_amount_raw: &str,
+ quote_amount_raw: &str,
+ ) {
+ seed_fluxbeam_swap_transaction_with_err(
+ database,
+ signature,
+ base_amount_raw,
+ quote_amount_raw,
+ serde_json::json!({ "InstructionError": [0, { "Custom": 1 }] }),
+ )
+ .await;
+ }
+
#[tokio::test]
async fn record_transaction_by_signature_creates_trade_event_and_pair_metric() {
let database = make_database().await;
@@ -375,4 +415,49 @@ mod tests {
};
assert_eq!(pair_metric.trade_count, 1);
}
+
+ #[tokio::test]
+ async fn record_transaction_by_signature_skips_failed_transaction() {
+ let database = make_database().await;
+ seed_failed_fluxbeam_swap_transaction(
+ database.clone(),
+ "sig-trade-aggregation-failed-1",
+ "1000",
+ "2500",
+ )
+ .await;
+ let transaction_result = crate::query_chain_transactions_get_by_signature(
+ database.as_ref(),
+ "sig-trade-aggregation-failed-1",
+ )
+ .await;
+ let transaction_option = match transaction_result {
+ Ok(transaction_option) => transaction_option,
+ Err(error) => panic!("transaction fetch must succeed: {}", error),
+ };
+ let transaction = match transaction_option {
+ Some(transaction) => transaction,
+ None => panic!("transaction must exist"),
+ };
+ let transaction_id = match transaction.id {
+ Some(transaction_id) => transaction_id,
+ None => panic!("transaction id must exist"),
+ };
+ let service = crate::TradeAggregationService::new(database.clone());
+ let record_result =
+ service.record_transaction_by_signature("sig-trade-aggregation-failed-1").await;
+ let results = match record_result {
+ Ok(results) => results,
+ Err(error) => panic!("failed transaction aggregation must not fail: {}", error),
+ };
+ assert_eq!(results.len(), 0);
+ let trade_events_result =
+ crate::query_trade_events_list_by_transaction_id(database.as_ref(), transaction_id)
+ .await;
+ let trade_events = match trade_events_result {
+ Ok(trade_events) => trade_events,
+ Err(error) => panic!("trade event list must succeed: {}", error),
+ };
+ assert_eq!(trade_events.len(), 0);
+ }
}