diff --git a/CHANGELOG.md b/CHANGELOG.md
index e8f6639..ed2fc6e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -62,5 +62,6 @@
0.7.29 - Ajout d’une matrice DEX commune (`dex_support_matrix`) utilisée par le catalogue DEX, la classification transactionnelle et l’enregistrement des protocol candidates ; ajout du profil de validation `0.7.29_multi_dex_matrix_baseline` exposant la matrice dans le rapport de validation ; préparation explicite des surfaces planifiées sans inventer de program ids non vérifiés.
0.7.30 - Ajout d’une taxonomie DEX plus fine pour les événements décodés : `eventLifecycleKind`, `eventActionability`, `nonTradeUseful`, compteurs diagnostics des événements non-trade utiles, trades non actionnables et classifications inconnues ; ajout du profil `0.7.30_non_trade_event_classification` sans modification volontaire de la matérialisation trade/candle.
0.7.31 - Application de la politique Option B : les transactions failed restent traçables dans les événements décodés mais ne peuvent plus alimenter `trade_events`, metrics ou candles ; le replay local réinitialise les tables de matérialisation marché avant reconstruction pour supprimer les anciennes lignes dérivées non actionnables.
-0.7.32 - Clarification de la sémantique des diagnostics locaux : séparation des gaps littéraux de paires et des gaps bloquants/actionnables, ajout des compteurs de matérialisation par paire, résumé `pairActionabilitySummaries`, profil `0.7.32_validation_report_semantics` et garde-fous sur la matrice DEX sans modification de la matérialisation trade/candle.
-0.7.33 - Ajout de la classification diagnostique `pairTradingReadiness` pour les paires, avec `quoteAssetClass`, `tradingRouteRequired`, résumé `pairTradingReadinessSummaries`, profil de validation `0.7.33_pair_trading_readiness` et mise à jour de la sélection UI Demo Pipeline 2 sans modifier la matérialisation trade/candle.
+0.7.32 - Clarification des sémantiques de validation locale : distinction entre gaps littéraux, gaps bloquants et paires actionnables, afin d’éviter de bloquer sur des paires détectées mais non matérialisées par trade.
+0.7.33 - Ajout du profil `0.7.33_pair_trading_readiness`, avec classification des paires directes WSOL, directes stable, inverses stable/WSOL et cross-quotes nécessitant un router.
+0.7.34 - Ajout du profil `0.7.34_non_trade_liquidity_lifecycle`, matérialisation des tables non-trade liquidité/lifecycle, warning non bloquant pour DEX attendus absents du corpus local, première tranche DLMM : `add_liquidity`, `remove_liquidity`, `initialize_position`, `initialize_bin_array`, intégration de la matérialisation non-trade dans les backfills token/pool ciblés, et distinction `PositionOpen`/`PositionClose` dans `LiquidityEventKind`.
diff --git a/Cargo.toml b/Cargo.toml
index b923d7a..8b1828d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,7 +8,7 @@ members = [
]
[workspace.package]
-version = "0.7.33"
+version = "0.7.34"
edition = "2024"
license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot"
diff --git a/README.md b/README.md
index f9240e3..48c23cd 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@
`khadhroony-bobobot` est un workspace Rust destiné à la détection, au décodage, à l’analyse et, à terme, au trading semi-automatisé de tokens Solana.
-Le README précédent décrivait surtout l’état `0.3.1`. Ce fichier reflète l’état de reprise autour de `0.7.33` : le socle transport HTTP/WS, la résolution transactionnelle, le modèle SQLite, plusieurs connecteurs DEX, les candles, les signaux analytiques, la validation locale et une matrice DEX commune existent déjà.
+Le README précédent décrivait surtout l’état `0.3.1`. Ce fichier reflète l’état de reprise autour de `0.7.34` : le socle transport HTTP/WS, la résolution transactionnelle, le modèle SQLite, plusieurs connecteurs DEX, les candles, les signaux analytiques, la validation locale et une matrice DEX commune existent déjà.
## 1. Objectif
@@ -31,7 +31,7 @@ Le workspace contient deux crates principales.
La logique métier doit rester dans `kb_lib`. `kb_demo_app` doit rester une façade UI/Tauri et ne doit pas récupérer de logique Solana ou DEX profonde.
-## 3. État actuel autour de `0.7.33`
+## 3. État actuel autour de `0.7.34`
### 3.1. Socle stabilisé à ne pas refactorer maintenant
@@ -97,11 +97,7 @@ La distinction importante est la suivante :
Depuis `0.7.29`, la matrice de support DEX est portée par `kb_lib/src/dex_support_matrix.rs`. Elle centralise le code interne, la famille, la version, le type de surface, les program ids vérifiés localement, le statut de support, les capacités actuelles et les raisons de skip.
-Depuis `0.7.30`, les événements décodés reçoivent aussi une classification plus fine : `eventLifecycleKind`, `eventActionability` et `nonTradeUseful`. Cette classification sert aux diagnostics et prépare la matérialisation future des événements non-trade sans alimenter directement les trades/candles.
-
-Depuis `0.7.32`, les diagnostics distinguent explicitement les gaps littéraux de catalogue (`literalPairWithoutTradeCount`, `literalPairWithoutCandleCount`) des gaps bloquants/actionnables (`blockingPairWithoutTradeCount`, `blockingPairWithoutCandleCount`). Les anciens champs `pairWithoutTradeCount` et `pairWithoutCandleCount` restent exposés comme alias de compatibilité pour les gaps bloquants/actionnables.
-
-Depuis `0.7.33`, les diagnostics ajoutent une classification `pairTradingReadiness` au niveau des paires et des résumés agrégés `pairTradingReadinessSummaries`. Cette classification sépare les paires directement lisibles/tradables contre WSOL ou stable, les paires inversées avec WSOL/stable en base, les paires cross-quote nécessitant un routeur/aggregator, les paires non matérialisées en trade et les cas de quote inconnue. Elle reste purement diagnostique : elle ne modifie ni le replay, ni les `trade_events`, ni les candles.
+Depuis `0.7.30`, les événements décodés reçoivent aussi une classification plus fine : `eventLifecycleKind`, `eventActionability` et `nonTradeUseful`. Depuis `0.7.34`, cette classification commence à alimenter les tables non-trade utiles, sans alimenter directement les trades/candles. Le backfill ciblé token/pool appelle aussi cette matérialisation afin que les compteurs `liquidityEventCount` et `poolLifecycleEventCount` soient cohérents sans devoir lancer un replay local séparé.
| Code cible | Type | Statut `0.7.29` | Prochaine action |
|---|---:|---|---|
@@ -175,6 +171,24 @@ Avant d’étendre trop agressivement les DEX, ces tables doivent être stabilis
`k_sol_liquidity_events` existe déjà et doit être stabilisée/étendue plutôt que recréée sans nécessité.
+### 5.3. État `0.7.34` des événements non-trade
+
+Le profil `0.7.34_non_trade_liquidity_lifecycle` valide deux choses distinctes :
+
+- les tables et diagnostics non-trade existent pour compter `liquidityEventCount` et `poolLifecycleEventCount` ;
+- les décodeurs doivent maintenant émettre les événements utiles, au lieu de les classer comme inconnus ou de les ignorer.
+
+Première tranche couverte : Meteora DLMM. Les discriminants et hints suivants sont pris en charge comme événements non-trade utiles :
+
+| Event kind | Catégorie | Effet attendu |
+|---|---|---|
+| `meteora_dlmm.add_liquidity` | liquidity | persistance dans `k_sol_liquidity_events` quand le replay matérialise les decoded events |
+| `meteora_dlmm.remove_liquidity` | liquidity | persistance dans `k_sol_liquidity_events` |
+| `meteora_dlmm.initialize_position` | liquidity / position open | persistance dans `k_sol_liquidity_events` avec `LiquidityEventKind::PositionOpen`, sans génération de trade/candle |
+| `meteora_dlmm.initialize_bin_array` | pool lifecycle | persistance dans `k_sol_pool_lifecycle_events` |
+
+Invariant maintenu : ces événements peuvent améliorer le scoring, le contexte de pool et le diagnostic, mais ne doivent jamais créer directement de `trade_events`, pair metrics ou candles.
+
## 6. Politique de refactor actuelle
Le code et la documentation sont vivants. Les refactors agressifs sont acceptables lorsque cela rend le pipeline plus propre et plus durable, à condition de respecter ces limites :
@@ -219,18 +233,16 @@ Les tests peuvent rester plus souples lorsque cela clarifie le test.
La reprise doit suivre cet ordre :
-1. conserver la classification `0.7.33` : les paires matérialisées doivent être classées par readiness trading sans transformer les paires cross-quote ou inversées en erreurs bloquantes ;
-2. conserver la sémantique `0.7.32` : les gaps littéraux de catalogue ne doivent pas être confondus avec les gaps bloquants/actionnables utilisés par la validation ;
-3. conserver la non-régression `0.7.31` : transactions failed traçables mais exclues des `trade_events`, metrics et candles ;
-4. utiliser la matrice `0.7.29` comme source commune pour le catalogue, la classification et les protocol candidates ;
-5. relier progressivement les événements non-trade aux tables existantes : lifecycle, liquidité, fees, rewards, admin ;
-6. consolider Meteora, surtout `meteora_dlmm` et le cas partiel `meteora_damm_v1` ;
-7. ajouter les launch surfaces manquantes comme origines de mint : LaunchLab/Launchpad, LetsBonk/Bonk.fun, Boop.fun, Moonshot/Moonit, Believe, Bags ;
-8. traiter Heaven ;
-9. consolider Orca/FluxBeam/DexLab ;
-10. isoler Raydium AMM v4 legacy ;
-11. effectuer une validation DEX v1 consolidée ;
-12. reprendre ensuite l’UI analytique et les vues token/pair/pool.
+1. conserver la non-régression `0.7.31` : transactions failed traçables mais exclues des `trade_events`, metrics et candles ;
+2. utiliser la matrice `0.7.29` comme source commune pour le catalogue, la classification et les protocol candidates ;
+3. relier progressivement les événements non-trade aux tables existantes : lifecycle, liquidité, fees, rewards, admin ;
+4. consolider Meteora, surtout `meteora_dlmm` et le cas partiel `meteora_damm_v1` ;
+5. ajouter les launch surfaces manquantes comme origines de mint : LaunchLab/Launchpad, LetsBonk/Bonk.fun, Boop.fun, Moonshot/Moonit, Believe, Bags ;
+6. traiter Heaven ;
+7. consolider Orca/FluxBeam/DexLab ;
+8. isoler Raydium AMM v4 legacy ;
+9. effectuer une validation DEX v1 consolidée ;
+10. reprendre ensuite l’UI analytique et les vues token/pair/pool.
## 9. Fichiers utiles pour reprendre dans une nouvelle session
diff --git a/ROADMAP.md b/ROADMAP.md
index 9d94b38..8ef818c 100644
--- a/ROADMAP.md
+++ b/ROADMAP.md
@@ -846,38 +846,27 @@ Réalisé :
- exposer `resetMarketMaterializationDeletedCount` dans le résultat de replay UI ;
- conserver la validation multi-DEX et la matrice DEX comme garde-fous avant d’ajouter les surfaces restantes.
-### 6.064. Version `0.7.32` — Sémantique des diagnostics et compteurs de validation
+### 6.064. Version `0.7.32` — Transactions inconnues et protocol candidates
Réalisé :
-- conserver la politique `0.7.31` : transactions failed traçables mais exclues des `trade_events`, metrics et candles ;
-- clarifier que `pairWithoutTradeCount` et `pairWithoutCandleCount` sont des compteurs de gaps bloquants/actionnables, pas des compteurs littéraux sur tout le catalogue ;
-- ajouter `literalPairWithoutTradeCount` et `literalPairWithoutCandleCount` pour les paires de catalogue sans trade/candle matérialisé ;
-- ajouter `blockingPairWithoutTradeCount` et `blockingPairWithoutCandleCount` comme noms explicites des anciens compteurs bloquants ;
-- ajouter les compteurs de matérialisation par paire : `tradeMaterializedPairCount`, `candleMaterializedPairCount`, `actionablePairCount`, `candleBucketTimeframeCount` et `candlesAreBucketed` ;
-- ajouter `pairActionabilitySummaries` pour distinguer les paires matérialisées, actionnables sans matérialisation, candidates failed, non-actionables, décodées sans trade candidate et catalog-only ;
-- ajouter le profil `0.7.32_validation_report_semantics` ;
-- ajouter des garde-fous de validation sur la matrice DEX : entrées `supported` entièrement matérialisées, entrées `partial` avec `skipReason`, entrées `planned/to_verify` non activées au catalogue ;
-- ne pas modifier la logique de replay, trade aggregation ou candle aggregation validée en `0.7.31`.
+- consolider `k_sol_transaction_classifications`, déjà présente, avec les catégories utiles au suivi DEX,
+- consolider `k_sol_protocol_candidates`, déjà présente, pour prioriser les programmes inconnus ou partiellement reconnus,
+- classifier les transactions résolues en catégories : known supported, known partial, known non-trade, unknown program, unknown protocol candidate, unknown event kind, failed transaction, non-actionable trade,
+- conserver les `program_id`, comptes, signatures, préfixes de `data`, logs et indices d’instructions utiles à l’analyse,
+- créer des requêtes de diagnostic pour repérer les programmes inconnus fréquents,
+- permettre de promouvoir plus tard un protocol candidate vers un vrai DEX/surface sans perdre l’historique,
+- garantir que ces tables n’alimentent jamais directement les trades/candles.
-Repoussé après cette clarification : consolider les transactions inconnues et protocol candidates sans polluer les trades/candles.
-
-### 6.065. Version `0.7.33` — Readiness trading des paires
+### 6.065. Version `0.7.33` — Pair trading readiness et routes de cotation
Réalisé :
-- ajouter une classification diagnostique `pairTradingReadiness` pour chaque paire inspectée localement ;
-- distinguer `direct_wsol_quote`, `direct_stable_quote`, `inverse_wsol_base`, `inverse_stable_base`, `cross_quote_requires_router`, `unknown_quote` et `non_trade_materialized` ;
-- exposer `quoteAssetClass` et `tradingRouteRequired` dans les diagnostics par paire ;
-- ajouter `pairTradingReadinessSummaries` dans le résumé local du pipeline ;
-- ajouter le profil `0.7.33_pair_trading_readiness` ;
-- valider que les résumés de readiness couvrent toutes les paires et restent cohérents avec les compteurs `tradeMaterializedPairCount`, `tradeEventCount` et `pairCandleCount` ;
-- ne pas modifier la logique de replay, `trade_events`, metrics ou candles.
-
-Objectif : préparer la future couche d’achat/vente en distinguant les paires immédiatement exploitables contre WSOL/stable des paires qui nécessitent inversion de lecture ou routeur/aggregator.
+- classifier les paires `direct_wsol_quote`, `direct_stable_quote`, `inverse_wsol_base`, `inverse_stable_base`, `cross_quote_requires_router` et `non_trade_materialized` ;
+- exposer `quoteAssetClass` et `tradingRouteRequired` dans les diagnostics ;
+- éviter de bloquer la validation sur des paires seulement listées ou détectées sans trade matérialisé ;
+- préparer la future sélection des paires directement tradables versus paires nécessitant un router.
### 6.066. Version `0.7.34` — Événements non-trade v1 : liquidité et cycle de vie pool
-Objectif : exploiter les événements utiles à l’analyse et au trading semi-automatique sans les mélanger avec les swaps/candles.
-
-À faire :
+Réalisé :
- stabiliser et étendre `k_sol_liquidity_events` au lieu de la recréer inutilement,
- ajouter `k_sol_pool_lifecycle_events`,
@@ -886,6 +875,10 @@ Objectif : exploiter les événements utiles à l’analyse et au trading semi-a
- conserver le `payload_json` source pour audit,
- alimenter les diagnostics locaux avec les compteurs liquidité/lifecycle,
- garantir qu’un événement de liquidité ou de cycle de vie ne produit jamais de candle directement.
+- première tranche DLMM : reconnaître et persister `meteora_dlmm.add_liquidity`, `meteora_dlmm.remove_liquidity`, `meteora_dlmm.initialize_position` et `meteora_dlmm.initialize_bin_array` comme événements non-trade utiles ;
+- intégrer la matérialisation non-trade dans les backfills ciblés token/pool, pas uniquement dans le replay local, afin que les diagnostics reflètent immédiatement les événements DLMM non-trade décodés ;
+- distinguer `PositionOpen` et `PositionClose` dans `LiquidityEventKind` au lieu de rabattre les positions CLMM/DLMM sur `Add`/`Remove` ;
+- conserver `meteora_damm_v1` manquant comme warning non bloquant lorsque le corpus de backfill local ne contient pas ce DEX.
### 6.067. Version `0.7.35` — Événements non-trade v2 : fees, rewards et administration
Objectif : conserver les événements utiles au risque, au scoring, à l’économie du pool et à la traçabilité opérationnelle.
@@ -1227,19 +1220,17 @@ Le projet doit maintenir au minimum :
La priorité immédiate est désormais la suivante :
1. conserver la validation acquise `0.7.31` : transactions failed traçables mais exclues des `trade_events`, metrics et candles, aucun trade/candle candidate sans payload montant/prix exploitable, aucun diagnostic bloquant masqué,
-2. conserver la clarification `0.7.32` entre gaps littéraux de catalogue et gaps bloquants/actionnables,
-3. conserver la classification `0.7.33` des paires par readiness trading : direct WSOL/stable, inverse WSOL/stable, cross-quote avec routeur requis, inconnue ou non matérialisée,
-4. utiliser la matrice `0.7.29` (`kb_lib/src/dex_support_matrix.rs`) comme source commune pour le catalogue DEX, les mappings program id -> protocole, la classification transactionnelle et les protocol candidates,
-5. garder les clients HTTP/WS et managers réseau hors du refactor DEX tant qu’ils ne bloquent pas le pipeline,
-6. consolider les événements non-trade sans les confondre avec les trades/candles : lifecycle de pool, liquidité, fees, rewards, admin/config, migration et launch/mint,
-6. rattacher les launch surfaces aux tokens et aux pools migrés : Raydium LaunchLab/Launchpad, LetsBonk/Bonk.fun, Boop.fun, Moonshot/Moonit, Believe, Bags et Heaven,
-7. consolider Meteora avec corpus fiable : `meteora_dlmm`, `meteora_damm_v1`, `meteora_damm_v2`, `meteora_dbc` et `meteora_dlc` si le programme est confirmé,
-8. consolider Orca, FluxBeam et DexLab sur corpus,
-9. traiter `raydium_amm_v4` legacy seulement après les autres Raydium, avec corpus dédié prouvant le programme `675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8`,
-10. ajouter une matérialisation dédiée des transactions inconnues ou partiellement décodées pour analyser les DEX manquants sans polluer les trades/candles,
-11. effectuer une validation DEX v1 consolidée sur tous les connecteurs supportés avant de considérer la couche DEX `0.7.x` comme stable,
-12. ajouter ensuite les overlays des signaux analytiques sur les candles,
-13. consolider les vues métier `token / pair / pool` dans `kb_demo_app`, y compris les événements liquidité, lifecycle, fees, rewards et admin,
-14. stabiliser l’ergonomie, les filtres, la pagination et la navigation de l’UI d’inspection,
-15. préparer ensuite l’ouverture de `0.8.x` pour l’analyse, les filtres, les patterns et les projections graphiques,
-16. préparer enfin Yellowstone gRPC comme extension de capacité, et non comme remplacement du socle HTTP / WS existant.
+2. utiliser la matrice `0.7.29` (`kb_lib/src/dex_support_matrix.rs`) comme source commune pour le catalogue DEX, les mappings program id -> protocole, la classification transactionnelle et les protocol candidates,
+3. garder les clients HTTP/WS et managers réseau hors du refactor DEX tant qu’ils ne bloquent pas le pipeline,
+4. consolider les événements non-trade sans les confondre avec les trades/candles : lifecycle de pool, liquidité, fees, rewards, admin/config, migration et launch/mint,
+5. rattacher les launch surfaces aux tokens et aux pools migrés : Raydium LaunchLab/Launchpad, LetsBonk/Bonk.fun, Boop.fun, Moonshot/Moonit, Believe, Bags et Heaven,
+6. consolider Meteora avec corpus fiable : `meteora_dlmm`, `meteora_damm_v1`, `meteora_damm_v2`, `meteora_dbc` et `meteora_dlc` si le programme est confirmé,
+7. consolider Orca, FluxBeam et DexLab sur corpus,
+8. traiter `raydium_amm_v4` legacy seulement après les autres Raydium, avec corpus dédié prouvant le programme `675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8`,
+9. ajouter une matérialisation dédiée des transactions inconnues ou partiellement décodées pour analyser les DEX manquants sans polluer les trades/candles,
+10. effectuer une validation DEX v1 consolidée sur tous les connecteurs supportés avant de considérer la couche DEX `0.7.x` comme stable,
+11. ajouter ensuite les overlays des signaux analytiques sur les candles,
+12. consolider les vues métier `token / pair / pool` dans `kb_demo_app`, y compris les événements liquidité, lifecycle, fees, rewards et admin,
+13. stabiliser l’ergonomie, les filtres, la pagination et la navigation de l’UI d’inspection,
+14. préparer ensuite l’ouverture de `0.8.x` pour l’analyse, les filtres, les patterns et les projections graphiques,
+15. préparer enfin Yellowstone gRPC comme extension de capacité, et non comme remplacement du socle HTTP / WS existant.
diff --git a/kb_demo_app/frontend/demo_pipeline2.html b/kb_demo_app/frontend/demo_pipeline2.html
index 7fa04fa..86a3e9c 100644
--- a/kb_demo_app/frontend/demo_pipeline2.html
+++ b/kb_demo_app/frontend/demo_pipeline2.html
@@ -166,7 +166,8 @@
Validation profile
- 0.7.33 — pair trading readiness
+ 0.7.34 — non-trade liquidity/lifecycle
+ 0.7.33 — pair trading readiness
0.7.32 — validation report semantics
0.7.31 — trade event actionability policy
0.7.30 — non-trade event classification
diff --git a/kb_demo_app/frontend/ts/bindings/DemoPipeline2LocalPipelineDiagnosticSummary.ts b/kb_demo_app/frontend/ts/bindings/DemoPipeline2LocalPipelineDiagnosticSummary.ts
index f2e29e1..caaeaee 100644
--- a/kb_demo_app/frontend/ts/bindings/DemoPipeline2LocalPipelineDiagnosticSummary.ts
+++ b/kb_demo_app/frontend/ts/bindings/DemoPipeline2LocalPipelineDiagnosticSummary.ts
@@ -52,6 +52,14 @@ decodedNonActionableTradeEventCount: number,
* Total decoded events with unknown classification.
*/
decodedUnknownEventCount: number,
+/**
+ * Total persisted liquidity events.
+ */
+liquidityEventCount: number,
+/**
+ * Total persisted pool lifecycle events.
+ */
+poolLifecycleEventCount: number,
/**
* Whether the local persisted pipeline has no blocking diagnostic issue.
*/
diff --git a/kb_demo_app/frontend/ts/bindings/DemoPipeline2LocalPipelineValidationReport.ts b/kb_demo_app/frontend/ts/bindings/DemoPipeline2LocalPipelineValidationReport.ts
index cc74a4f..9848b93 100644
--- a/kb_demo_app/frontend/ts/bindings/DemoPipeline2LocalPipelineValidationReport.ts
+++ b/kb_demo_app/frontend/ts/bindings/DemoPipeline2LocalPipelineValidationReport.ts
@@ -42,6 +42,14 @@ decodedNonActionableTradeEventCount: number,
* Total decoded events with unknown classification.
*/
decodedUnknownEventCount: number,
+/**
+ * Total persisted liquidity events.
+ */
+liquidityEventCount: number,
+/**
+ * Total persisted pool lifecycle events.
+ */
+poolLifecycleEventCount: number,
/**
* Number of entries currently exposed by the DEX support matrix.
*/
diff --git a/kb_demo_app/frontend/ts/demo_pipeline2.ts b/kb_demo_app/frontend/ts/demo_pipeline2.ts
index 9888d74..825673b 100644
--- a/kb_demo_app/frontend/ts/demo_pipeline2.ts
+++ b/kb_demo_app/frontend/ts/demo_pipeline2.ts
@@ -14,7 +14,6 @@ import type { DemoPipeline2BackfillPayload } from "./bindings/DemoPipeline2Backf
import type { DemoPipeline2PairCandlesRequest } from "./bindings/DemoPipeline2PairCandlesRequest.ts";
import type { DemoPipeline2PairCandlesPayload } from "./bindings/DemoPipeline2PairCandlesPayload.ts";
import type { DemoPipeline2LocalDiagnosticsPayload } from "./bindings/DemoPipeline2LocalDiagnosticsPayload.ts";
-import type { DemoPipeline2LocalValidationRequest } from "./bindings/DemoPipeline2LocalValidationRequest.ts";
import type { DemoPipeline2LocalValidationPayload } from "./bindings/DemoPipeline2LocalValidationPayload.ts";
import type { DemoPipeline2ProgramInstructionDiscriminatorSummaryRequest } from "./bindings/DemoPipeline2ProgramInstructionDiscriminatorSummaryRequest.ts";
import type { DemoPipeline2ProgramInstructionDiscriminatorSummaryPayload } from "./bindings/DemoPipeline2ProgramInstructionDiscriminatorSummaryPayload.ts";
@@ -52,13 +51,18 @@ interface LocalPipelineReplayResult {
decodeErrorCount: number;
detectErrorCount: number;
tradeAggregationErrorCount: number;
+ nonTradeMaterializationErrorCount: number;
pairCandleErrorCount: number;
analyticSignalErrorCount: number;
decodedEventCount: number;
detectionCount: number;
tradeEventCount: number;
+ liquidityEventCount: number;
+ poolLifecycleEventCount: number;
pairCandleUpsertCount: number;
analyticSignalUpsertCount: number;
+ transactionClassificationCount: number;
+ transactionClassificationErrorCount: number;
tokenMetadataUpdatedCount: number;
pairSymbolUpdatedCount: number;
resetMarketMaterializationDeletedCount: number;
@@ -358,7 +362,6 @@ document.addEventListener("DOMContentLoaded", async () => {
const replayMetadataLimitInput = document.querySelector("#demoPipeline2ReplayMetadataLimitInput");
const replayLocalPipelineButton = document.querySelector("#demoPipeline2ReplayLocalPipelineButton");
const diagnoseLocalPipelineButton = document.querySelector("#demoPipeline2DiagnoseLocalPipelineButton");
- const validationProfileSelect = document.querySelector("#demoPipeline2ValidationProfileSelect");
const validateLocalPipelineButton = document.querySelector("#demoPipeline2ValidateLocalPipelineButton");
const discriminatorProgramIdInput = document.querySelector("#demoPipeline2DiscriminatorProgramIdInput");
@@ -405,7 +408,6 @@ document.addEventListener("DOMContentLoaded", async () => {
!replayMetadataLimitInput ||
!replayLocalPipelineButton ||
!diagnoseLocalPipelineButton ||
- !validationProfileSelect ||
!validateLocalPipelineButton ||
!discriminatorProgramIdInput ||
!discriminatorLimitInput ||
@@ -618,7 +620,7 @@ document.addEventListener("DOMContentLoaded", async () => {
appendLogLine(
logTextarea,
- `[ui] local pipeline replay completed: ${result.replayedTransactionCount.toString()} replayed, ${result.tradeEventCount.toString()} trades, ${result.pairCandleUpsertCount.toString()} candle upserts, resetDeleted='${result.resetMarketMaterializationDeletedCount.toString()}'`,
+ `[ui] local pipeline replay completed: ${result.replayedTransactionCount.toString()} replayed, ${result.tradeEventCount.toString()} trades, ${result.liquidityEventCount.toString()} liquidity, ${result.poolLifecycleEventCount.toString()} lifecycle, ${result.pairCandleUpsertCount.toString()} candle upserts, resetDeleted='${result.resetMarketMaterializationDeletedCount.toString()}'`,
);
await refreshCatalog();
@@ -641,7 +643,7 @@ document.addEventListener("DOMContentLoaded", async () => {
appendLogLine(
logTextarea,
- `[ui] local pipeline diagnostics completed: ${payload.summary.decodedEventCount.toString()} decoded, ${payload.summary.tradeEventCount.toString()} trades, ${payload.summary.pairCandleCount.toString()} candles, actionableMissing='${payload.summary.actionableMissingTradeEventCount.toString()}', nonActionablePairs='${payload.summary.nonActionablePairCount.toString()}', blocking='${payload.summary.blockingIssueCount.toString()}', nonTradeUseful='${payload.summary.decodedNonTradeUsefulEventCount.toString()}', nonActionableTrade='${payload.summary.decodedNonActionableTradeEventCount.toString()}', unknownEvents='${payload.summary.decodedUnknownEventCount.toString()}'`,
+ `[ui] local pipeline diagnostics completed: ${payload.summary.decodedEventCount.toString()} decoded, ${payload.summary.tradeEventCount.toString()} trades, ${payload.summary.pairCandleCount.toString()} candles, actionableMissing='${payload.summary.actionableMissingTradeEventCount.toString()}', nonActionablePairs='${payload.summary.nonActionablePairCount.toString()}', blocking='${payload.summary.blockingIssueCount.toString()}'`,
);
} catch (error) {
appendLogLine(logTextarea, `[ui] local pipeline diagnostics error: ${String(error)}`);
@@ -651,16 +653,11 @@ document.addEventListener("DOMContentLoaded", async () => {
});
validateLocalPipelineButton.addEventListener("click", async () => {
- const request: DemoPipeline2LocalValidationRequest = {
- profileCode: validationProfileSelect.value,
- };
-
- appendLogLine(logTextarea, `[ui] validating local pipeline with ${request.profileCode} profile`);
+ appendLogLine(logTextarea, "[ui] validating local pipeline");
try {
const payload = await invoke(
"demo_pipeline2_validate_local_pipeline",
- { request },
);
localValidationTextarea.value = payload.validationJson;
@@ -668,7 +665,7 @@ document.addEventListener("DOMContentLoaded", async () => {
appendLogLine(
logTextarea,
- `[ui] local pipeline validation completed: profile='${payload.run.validationProfileCode}' passed='${payload.run.validationPassed ? "yes" : "no"}' blocking='${payload.run.blockingIssueCount.toString()}' warnings='${payload.run.warningCount.toString()}' dexMatrix='${payload.run.report.dexSupportMatrixEntryCount.toString()}' nonTradeUseful='${payload.run.report.decodedNonTradeUsefulEventCount.toString()}' nonActionableTrade='${payload.run.report.decodedNonActionableTradeEventCount.toString()}' unknownEvents='${payload.run.report.decodedUnknownEventCount.toString()}'`,
+ `[ui] local pipeline validation completed: profile='${payload.run.validationProfileCode}' passed='${payload.run.validationPassed ? "yes" : "no"}' blocking='${payload.run.blockingIssueCount.toString()}' warnings='${payload.run.warningCount.toString()}'`,
);
} catch (error) {
appendLogLine(logTextarea, `[ui] local pipeline validation error: ${String(error)}`);
diff --git a/kb_demo_app/package.json b/kb_demo_app/package.json
index 6320443..9b2aa78 100644
--- a/kb_demo_app/package.json
+++ b/kb_demo_app/package.json
@@ -1,7 +1,7 @@
{
"name": "kb-demo-app",
"private": true,
- "version": "0.7.33",
+ "version": "0.7.34",
"type": "module",
"scripts": {
"dev": "vite",
diff --git a/kb_demo_app/src/demo_pipeline2.rs b/kb_demo_app/src/demo_pipeline2.rs
index e584e6a..d78d128 100644
--- a/kb_demo_app/src/demo_pipeline2.rs
+++ b/kb_demo_app/src/demo_pipeline2.rs
@@ -161,6 +161,12 @@ pub(crate) struct DemoPipeline2LocalPipelineValidationReport {
/// Total decoded events with unknown classification.
#[ts(type = "number")]
pub decoded_unknown_event_count: i64,
+ /// Total persisted liquidity events.
+ #[ts(type = "number")]
+ pub liquidity_event_count: i64,
+ /// Total persisted pool lifecycle events.
+ #[ts(type = "number")]
+ pub pool_lifecycle_event_count: i64,
/// Number of entries currently exposed by the DEX support matrix.
#[ts(type = "number")]
pub dex_support_matrix_entry_count: i64,
@@ -271,6 +277,12 @@ pub(crate) struct DemoPipeline2LocalPipelineDiagnosticSummary {
/// Total decoded events with unknown classification.
#[ts(type = "number")]
pub decoded_unknown_event_count: i64,
+ /// Total persisted liquidity events.
+ #[ts(type = "number")]
+ pub liquidity_event_count: i64,
+ /// Total persisted pool lifecycle events.
+ #[ts(type = "number")]
+ pub pool_lifecycle_event_count: i64,
/// Whether the local persisted pipeline has no blocking diagnostic issue.
pub diagnostics_clean: bool,
/// Number of blocking diagnostic issues.
@@ -1075,7 +1087,7 @@ pub(crate) async fn demo_pipeline2_validate_local_pipeline(
let service = kb_lib::LocalPipelineValidationService::new(database.clone());
let profile_code = match request {
Some(request) => request.profile_code,
- None => "0.7.33_pair_trading_readiness".to_string(),
+ None => "0.7.34_non_trade_liquidity_lifecycle".to_string(),
};
let run_result = match profile_code.as_str() {
"0.7.27" | "0.7.27_dexes_non_regression" => {
@@ -1099,6 +1111,9 @@ pub(crate) async fn demo_pipeline2_validate_local_pipeline(
"0.7.33" | "0.7.33_pair_trading_readiness" => {
service.validate_v0_7_33_current_database().await
},
+ "0.7.34" | "0.7.34_non_trade_liquidity_lifecycle" => {
+ service.validate_v0_7_34_current_database().await
+ },
other => Err(kb_lib::Error::InvalidState(format!(
"unsupported local pipeline validation profile: {other}"
))),
@@ -1528,6 +1543,8 @@ fn demo_pipeline2_map_local_validation_report(
decoded_non_trade_useful_event_count: report.decoded_non_trade_useful_event_count,
decoded_non_actionable_trade_event_count: report.decoded_non_actionable_trade_event_count,
decoded_unknown_event_count: report.decoded_unknown_event_count,
+ liquidity_event_count: report.liquidity_event_count,
+ pool_lifecycle_event_count: report.pool_lifecycle_event_count,
dex_support_matrix_entry_count: report.dex_support_matrix_entry_count,
dex_support_matrix,
issues,
@@ -1654,6 +1671,8 @@ fn demo_pipeline2_map_local_diagnostics_summary(
decoded_non_trade_useful_event_count: summary.decoded_non_trade_useful_event_count,
decoded_non_actionable_trade_event_count: summary.decoded_non_actionable_trade_event_count,
decoded_unknown_event_count: summary.decoded_unknown_event_count,
+ liquidity_event_count: summary.liquidity_event_count,
+ pool_lifecycle_event_count: summary.pool_lifecycle_event_count,
diagnostics_clean: summary.diagnostics_clean,
blocking_issue_count: summary.blocking_issue_count,
missing_trade_event_count: summary.missing_trade_event_count,
diff --git a/kb_demo_app/tauri.conf.json b/kb_demo_app/tauri.conf.json
index 7d92a72..a5be4a8 100644
--- a/kb_demo_app/tauri.conf.json
+++ b/kb_demo_app/tauri.conf.json
@@ -1,7 +1,7 @@
{
"$schema": "https://schema.tauri.app/config/2",
"productName": "kb-demo-app",
- "version": "0.7.33",
+ "version": "0.7.34",
"identifier": "com.sasedev.kb-demo-app",
"build": {
"beforeDevCommand": "npm run dev",
diff --git a/kb_lib/src/db.rs b/kb_lib/src/db.rs
index d6e8dd5..70ae02f 100644
--- a/kb_lib/src/db.rs
+++ b/kb_lib/src/db.rs
@@ -50,6 +50,7 @@ pub use dtos::PairCandleDto;
pub use dtos::PairDto;
pub use dtos::PairMetricDto;
pub use dtos::PoolDto;
+pub use dtos::PoolLifecycleEventDto;
pub use dtos::PoolListingDto;
pub use dtos::PoolOriginDto;
pub use dtos::PoolTokenDto;
@@ -87,6 +88,7 @@ pub use entities::PairCandleEntity;
pub use entities::PairEntity;
pub use entities::PairMetricEntity;
pub use entities::PoolEntity;
+pub use entities::PoolLifecycleEventEntity;
pub use entities::PoolListingEntity;
pub use entities::PoolOriginEntity;
pub use entities::PoolTokenEntity;
@@ -177,6 +179,9 @@ pub use queries::query_pairs_get_by_pool_id;
pub use queries::query_pairs_list;
pub use queries::query_pairs_update_symbol;
pub use queries::query_pairs_upsert;
+pub use queries::query_pool_lifecycle_events_get_by_decoded_event_id;
+pub use queries::query_pool_lifecycle_events_list_recent;
+pub use queries::query_pool_lifecycle_events_upsert;
pub use queries::query_pool_listings_get_by_pool_id;
pub use queries::query_pool_listings_list;
pub use queries::query_pool_listings_upsert;
diff --git a/kb_lib/src/db/dtos.rs b/kb_lib/src/db/dtos.rs
index 4fb512c..0707ec7 100644
--- a/kb_lib/src/db/dtos.rs
+++ b/kb_lib/src/db/dtos.rs
@@ -25,10 +25,10 @@ mod pair_candle;
mod pair_metric;
mod pool;
mod pool_listing;
+mod pool_lifecycle_event;
mod pool_origin;
mod pool_token;
mod program_instruction_diagnostic;
-mod program_instruction_discriminator_summary;
mod protocol_candidate;
mod protocol_candidate_summary;
mod swap;
@@ -39,22 +39,24 @@ mod trade_event;
mod transaction_classification;
mod wallet;
mod wallet_holding;
+mod program_instruction_discriminator_summary;
mod wallet_participation;
pub(crate) use local_pipeline_diagnostics::LocalDecodedEventDiagnosticSummaryRow;
+pub(crate) use local_pipeline_diagnostics::LocalEventClassificationDiagnosticSummaryRow;
pub(crate) use local_pipeline_diagnostics::LocalDexDiagnosticSummaryRow;
pub(crate) use local_pipeline_diagnostics::LocalDuplicateDecodedEventTradeDiagnosticSampleRow;
-pub(crate) use local_pipeline_diagnostics::LocalEventClassificationDiagnosticSummaryRow;
pub(crate) use local_pipeline_diagnostics::LocalMissingTradeEventDiagnosticSampleRow;
pub(crate) use local_pipeline_diagnostics::LocalMissingTradeEventReasonSummaryRow;
pub(crate) use local_pipeline_diagnostics::LocalMultiTradeSignaturePairDiagnosticSampleRow;
pub(crate) use local_pipeline_diagnostics::LocalNonActionablePairDiagnosticSummaryRow;
-pub(crate) use local_pipeline_diagnostics::LocalPairActionabilityDiagnosticSummaryRow;
pub(crate) use local_pipeline_diagnostics::LocalPairDiagnosticSummaryRow;
-pub(crate) use local_pipeline_diagnostics::LocalPairGapDiagnosticSampleRow;
+pub(crate) use local_pipeline_diagnostics::LocalPairActionabilityDiagnosticSummaryRow;
pub(crate) use local_pipeline_diagnostics::LocalPairTradingReadinessDiagnosticSummaryRow;
+pub(crate) use local_pipeline_diagnostics::LocalPairGapDiagnosticSampleRow;
pub(crate) use local_pipeline_diagnostics::LocalPipelineDiagnosticCountersRow;
+pub use program_instruction_discriminator_summary::ProgramInstructionDiscriminatorSummaryDto;
pub use analysis_signal::AnalysisSignalDto;
pub use chain_instruction::ChainInstructionDto;
pub use chain_slot::ChainSlotDto;
@@ -70,17 +72,17 @@ pub use launch_surface::LaunchSurfaceDto;
pub use launch_surface_key::LaunchSurfaceKeyDto;
pub use liquidity_event::LiquidityEventDto;
pub use local_pipeline_diagnostics::LocalDecodedEventDiagnosticSummaryDto;
+pub use local_pipeline_diagnostics::LocalEventClassificationDiagnosticSummaryDto;
pub use local_pipeline_diagnostics::LocalDexDiagnosticSummaryDto;
pub use local_pipeline_diagnostics::LocalDuplicateDecodedEventTradeDiagnosticSampleDto;
-pub use local_pipeline_diagnostics::LocalEventClassificationDiagnosticSummaryDto;
pub use local_pipeline_diagnostics::LocalMissingTradeEventDiagnosticSampleDto;
pub use local_pipeline_diagnostics::LocalMissingTradeEventReasonSummaryDto;
pub use local_pipeline_diagnostics::LocalMultiTradeSignaturePairDiagnosticSampleDto;
pub use local_pipeline_diagnostics::LocalNonActionablePairDiagnosticSummaryDto;
-pub use local_pipeline_diagnostics::LocalPairActionabilityDiagnosticSummaryDto;
pub use local_pipeline_diagnostics::LocalPairDiagnosticSummaryDto;
-pub use local_pipeline_diagnostics::LocalPairGapDiagnosticSampleDto;
+pub use local_pipeline_diagnostics::LocalPairActionabilityDiagnosticSummaryDto;
pub use local_pipeline_diagnostics::LocalPairTradingReadinessDiagnosticSummaryDto;
+pub use local_pipeline_diagnostics::LocalPairGapDiagnosticSampleDto;
pub use local_pipeline_diagnostics::LocalPipelineDiagnosticCountersDto;
pub use local_pipeline_diagnostics::LocalPipelineDiagnosticSummaryDto;
pub use observed_token::ObservedTokenDto;
@@ -91,10 +93,10 @@ pub use pair_candle::PairCandleDto;
pub use pair_metric::PairMetricDto;
pub use pool::PoolDto;
pub use pool_listing::PoolListingDto;
+pub use pool_lifecycle_event::PoolLifecycleEventDto;
pub use pool_origin::PoolOriginDto;
pub use pool_token::PoolTokenDto;
pub use program_instruction_diagnostic::ProgramInstructionDiagnosticDto;
-pub use program_instruction_discriminator_summary::ProgramInstructionDiscriminatorSummaryDto;
pub use protocol_candidate::ProtocolCandidateDto;
pub use protocol_candidate_summary::ProtocolCandidateSummaryDto;
pub use swap::SwapDto;
diff --git a/kb_lib/src/db/dtos/liquidity_event.rs b/kb_lib/src/db/dtos/liquidity_event.rs
index d6bab71..4638eea 100644
--- a/kb_lib/src/db/dtos/liquidity_event.rs
+++ b/kb_lib/src/db/dtos/liquidity_event.rs
@@ -7,6 +7,10 @@
pub struct LiquidityEventDto {
/// Optional numeric primary key.
pub id: std::option::Option,
+ /// Optional related transaction id.
+ pub transaction_id: std::option::Option,
+ /// Optional related decoded DEX event id.
+ pub decoded_event_id: std::option::Option,
/// Related DEX id.
pub dex_id: i64,
/// Related pool id.
@@ -19,8 +23,12 @@ pub struct LiquidityEventDto {
pub instruction_index: i64,
/// Optional slot number.
pub slot: std::option::Option,
+ /// Optional program id that emitted the decoded event.
+ pub program_id: std::option::Option,
/// Liquidity event kind.
pub event_kind: crate::LiquidityEventKind,
+ /// Optional original decoded event kind.
+ pub event_kind_text: std::option::Option,
/// Optional actor wallet.
pub actor_wallet: std::option::Option,
/// Base token id.
@@ -35,8 +43,14 @@ pub struct LiquidityEventDto {
pub quote_amount: std::string::String,
/// Optional LP amount as decimal text.
pub lp_amount: std::option::Option,
+ /// Whether the persisted amount fields are complete.
+ pub amounts_are_complete: bool,
+ /// Optional source decoded payload JSON.
+ pub payload_json: std::option::Option,
/// Execution timestamp.
pub executed_at: chrono::DateTime,
+ /// Creation timestamp.
+ pub created_at: chrono::DateTime,
}
impl LiquidityEventDto {
@@ -57,15 +71,20 @@ impl LiquidityEventDto {
quote_amount: std::string::String,
lp_amount: std::option::Option,
) -> Self {
+ let now = chrono::Utc::now();
return Self {
id: None,
+ transaction_id: None,
+ decoded_event_id: None,
dex_id,
pool_id,
pair_id,
signature,
instruction_index,
slot,
+ program_id: None,
event_kind,
+ event_kind_text: None,
actor_wallet,
base_token_id,
quote_token_id,
@@ -73,9 +92,32 @@ impl LiquidityEventDto {
base_amount,
quote_amount,
lp_amount,
- executed_at: chrono::Utc::now(),
+ amounts_are_complete: true,
+ payload_json: None,
+ executed_at: now,
+ created_at: now,
};
}
+
+ /// Adds decoded-event linkage and audit payload metadata to the DTO.
+ #[allow(clippy::too_many_arguments)]
+ pub fn with_decoded_event_metadata(
+ mut self,
+ transaction_id: std::option::Option,
+ decoded_event_id: std::option::Option,
+ program_id: std::option::Option,
+ event_kind_text: std::option::Option,
+ payload_json: std::option::Option,
+ amounts_are_complete: bool,
+ ) -> Self {
+ self.transaction_id = transaction_id;
+ self.decoded_event_id = decoded_event_id;
+ self.program_id = program_id;
+ self.event_kind_text = event_kind_text;
+ self.payload_json = payload_json;
+ self.amounts_are_complete = amounts_are_complete;
+ return self;
+ }
}
impl TryFrom for LiquidityEventDto {
@@ -114,13 +156,17 @@ impl TryFrom for LiquidityEventDto {
};
return Ok(Self {
id: Some(entity.id),
+ transaction_id: entity.transaction_id,
+ decoded_event_id: entity.decoded_event_id,
dex_id: entity.dex_id,
pool_id: entity.pool_id,
pair_id: entity.pair_id,
signature: entity.signature,
instruction_index: entity.instruction_index,
slot,
+ program_id: entity.program_id,
event_kind,
+ event_kind_text: entity.event_kind_text,
actor_wallet: entity.actor_wallet,
base_token_id: entity.base_token_id,
quote_token_id: entity.quote_token_id,
@@ -128,7 +174,27 @@ impl TryFrom for LiquidityEventDto {
base_amount: entity.base_amount,
quote_amount: entity.quote_amount,
lp_amount: entity.lp_amount,
+ amounts_are_complete: match entity.amounts_are_complete {
+ Some(amounts_are_complete) => amounts_are_complete != 0,
+ None => true,
+ },
+ payload_json: entity.payload_json,
executed_at,
+ created_at: match entity.created_at {
+ Some(created_at) => {
+ let created_at_result = chrono::DateTime::parse_from_rfc3339(&created_at);
+ match created_at_result {
+ Ok(created_at) => created_at.with_timezone(&chrono::Utc),
+ Err(error) => {
+ return Err(crate::Error::Db(format!(
+ "cannot parse liquidity event created_at '{}': {}",
+ created_at, error
+ )));
+ },
+ }
+ },
+ None => executed_at,
+ },
});
}
}
diff --git a/kb_lib/src/db/dtos/local_pipeline_diagnostics.rs b/kb_lib/src/db/dtos/local_pipeline_diagnostics.rs
index f9c87f8..034a6b7 100644
--- a/kb_lib/src/db/dtos/local_pipeline_diagnostics.rs
+++ b/kb_lib/src/db/dtos/local_pipeline_diagnostics.rs
@@ -23,6 +23,10 @@ pub struct LocalPipelineDiagnosticSummaryDto {
pub decoded_non_actionable_trade_event_count: i64,
/// Total decoded events with unknown classification.
pub decoded_unknown_event_count: i64,
+ /// Total persisted liquidity events.
+ pub liquidity_event_count: i64,
+ /// Total persisted pool lifecycle events.
+ pub pool_lifecycle_event_count: i64,
/// Whether the local persisted pipeline has no blocking diagnostic issue.
pub diagnostics_clean: bool,
/// Number of blocking diagnostic issues.
@@ -363,6 +367,10 @@ pub struct LocalPipelineDiagnosticCountersDto {
pub decoded_non_actionable_trade_event_count: i64,
/// Total decoded events with unknown classification.
pub decoded_unknown_event_count: i64,
+ /// Total persisted liquidity events.
+ pub liquidity_event_count: i64,
+ /// Total persisted pool lifecycle events.
+ pub pool_lifecycle_event_count: i64,
/// Total decoded trade candidates without trade event, including ignored failed transactions.
pub missing_trade_event_count: i64,
/// Explicit alias for decoded trade candidates without linked trade event.
@@ -433,6 +441,8 @@ pub(crate) struct LocalPipelineDiagnosticCountersRow {
pub(crate) decoded_non_trade_useful_event_count: i64,
pub(crate) decoded_non_actionable_trade_event_count: i64,
pub(crate) decoded_unknown_event_count: i64,
+ pub(crate) liquidity_event_count: i64,
+ pub(crate) pool_lifecycle_event_count: i64,
pub(crate) missing_trade_event_count: i64,
pub(crate) decoded_trade_candidate_without_trade_event_count: i64,
pub(crate) decoded_trade_candidate_without_trade_event_on_ok_transaction_count: i64,
diff --git a/kb_lib/src/db/dtos/pool_lifecycle_event.rs b/kb_lib/src/db/dtos/pool_lifecycle_event.rs
new file mode 100644
index 0000000..dff3e01
--- /dev/null
+++ b/kb_lib/src/db/dtos/pool_lifecycle_event.rs
@@ -0,0 +1,145 @@
+// file: kb_lib/src/db/dtos/pool_lifecycle_event.rs
+
+//! Pool lifecycle event DTO.
+
+/// Application-facing normalized pool lifecycle event DTO.
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct PoolLifecycleEventDto {
+ /// Optional numeric primary key.
+ pub id: std::option::Option,
+ /// Related transaction id.
+ pub transaction_id: i64,
+ /// Related decoded DEX event id, when available.
+ pub decoded_event_id: std::option::Option,
+ /// Related DEX id, when the DEX row is known.
+ pub dex_id: std::option::Option,
+ /// Related pool id, when the pool row is known.
+ pub pool_id: std::option::Option,
+ /// Related pair id, when the pair row is known.
+ pub pair_id: std::option::Option,
+ /// Transaction signature.
+ pub signature: std::string::String,
+ /// Optional slot number.
+ pub slot: std::option::Option,
+ /// Protocol name that emitted the decoded event.
+ pub protocol_name: std::string::String,
+ /// Program id that emitted the decoded event.
+ pub program_id: std::string::String,
+ /// Stable decoded event kind.
+ pub event_kind: std::string::String,
+ /// Pool account address, when decoded.
+ pub pool_account: std::option::Option,
+ /// First token mint, when decoded.
+ pub token_a_mint: std::option::Option,
+ /// Second token mint, when decoded.
+ pub token_b_mint: std::option::Option,
+ /// Source decoded payload JSON.
+ pub payload_json: std::string::String,
+ /// Execution timestamp.
+ pub executed_at: chrono::DateTime,
+ /// Creation timestamp.
+ pub created_at: chrono::DateTime,
+}
+
+impl PoolLifecycleEventDto {
+ /// Creates a new pool lifecycle event DTO.
+ #[allow(clippy::too_many_arguments)]
+ pub fn new(
+ transaction_id: i64,
+ decoded_event_id: std::option::Option,
+ dex_id: std::option::Option,
+ pool_id: std::option::Option,
+ pair_id: std::option::Option,
+ signature: std::string::String,
+ slot: std::option::Option,
+ protocol_name: std::string::String,
+ program_id: std::string::String,
+ event_kind: std::string::String,
+ pool_account: std::option::Option,
+ token_a_mint: std::option::Option,
+ token_b_mint: std::option::Option,
+ payload_json: std::string::String,
+ ) -> Self {
+ let now = chrono::Utc::now();
+ return Self {
+ id: None,
+ transaction_id,
+ decoded_event_id,
+ dex_id,
+ pool_id,
+ pair_id,
+ signature,
+ slot,
+ protocol_name,
+ program_id,
+ event_kind,
+ pool_account,
+ token_a_mint,
+ token_b_mint,
+ payload_json,
+ executed_at: now,
+ created_at: now,
+ };
+ }
+}
+
+impl TryFrom for PoolLifecycleEventDto {
+ type Error = crate::Error;
+
+ fn try_from(entity: crate::PoolLifecycleEventEntity) -> Result {
+ let executed_at_result = chrono::DateTime::parse_from_rfc3339(&entity.executed_at);
+ let executed_at = match executed_at_result {
+ Ok(executed_at) => executed_at.with_timezone(&chrono::Utc),
+ Err(error) => {
+ return Err(crate::Error::Db(format!(
+ "cannot parse pool lifecycle event executed_at '{}': {}",
+ entity.executed_at, error
+ )));
+ },
+ };
+ let created_at_result = chrono::DateTime::parse_from_rfc3339(&entity.created_at);
+ let created_at = match created_at_result {
+ Ok(created_at) => created_at.with_timezone(&chrono::Utc),
+ Err(error) => {
+ return Err(crate::Error::Db(format!(
+ "cannot parse pool lifecycle event created_at '{}': {}",
+ entity.created_at, error
+ )));
+ },
+ };
+ let slot = match entity.slot {
+ Some(slot) => {
+ let slot_result = u64::try_from(slot);
+ match slot_result {
+ Ok(slot) => Some(slot),
+ Err(error) => {
+ return Err(crate::Error::Db(format!(
+ "cannot convert pool lifecycle event slot '{}' to u64: {}",
+ slot, error
+ )));
+ },
+ }
+ },
+ None => None,
+ };
+ return Ok(Self {
+ id: Some(entity.id),
+ transaction_id: entity.transaction_id,
+ decoded_event_id: entity.decoded_event_id,
+ dex_id: entity.dex_id,
+ pool_id: entity.pool_id,
+ pair_id: entity.pair_id,
+ signature: entity.signature,
+ slot,
+ protocol_name: entity.protocol_name,
+ program_id: entity.program_id,
+ event_kind: entity.event_kind,
+ pool_account: entity.pool_account,
+ token_a_mint: entity.token_a_mint,
+ token_b_mint: entity.token_b_mint,
+ payload_json: entity.payload_json,
+ executed_at,
+ created_at,
+ });
+ }
+}
diff --git a/kb_lib/src/db/entities.rs b/kb_lib/src/db/entities.rs
index 8b33c19..0c60fe5 100644
--- a/kb_lib/src/db/entities.rs
+++ b/kb_lib/src/db/entities.rs
@@ -26,6 +26,7 @@ mod pair_candle;
mod pair_metric;
mod pool;
mod pool_listing;
+mod pool_lifecycle_event;
mod pool_origin;
mod pool_token;
mod program_instruction_diagnostic;
@@ -64,6 +65,7 @@ pub use pair_candle::PairCandleEntity;
pub use pair_metric::PairMetricEntity;
pub use pool::PoolEntity;
pub use pool_listing::PoolListingEntity;
+pub use pool_lifecycle_event::PoolLifecycleEventEntity;
pub use pool_origin::PoolOriginEntity;
pub use pool_token::PoolTokenEntity;
pub use program_instruction_diagnostic::ProgramInstructionDiagnosticEntity;
diff --git a/kb_lib/src/db/entities/liquidity_event.rs b/kb_lib/src/db/entities/liquidity_event.rs
index 34c78c7..8e6c3fd 100644
--- a/kb_lib/src/db/entities/liquidity_event.rs
+++ b/kb_lib/src/db/entities/liquidity_event.rs
@@ -7,6 +7,10 @@
pub struct LiquidityEventEntity {
/// Numeric primary key.
pub id: i64,
+ /// Optional related transaction id.
+ pub transaction_id: std::option::Option,
+ /// Optional related decoded DEX event id.
+ pub decoded_event_id: std::option::Option,
/// Related DEX id.
pub dex_id: i64,
/// Related pool id.
@@ -19,8 +23,12 @@ pub struct LiquidityEventEntity {
pub instruction_index: i64,
/// Optional slot number.
pub slot: std::option::Option,
+ /// Optional program id that emitted the decoded event.
+ pub program_id: std::option::Option,
/// Event kind stored as stable integer.
pub event_kind: i16,
+ /// Optional original decoded event kind.
+ pub event_kind_text: std::option::Option,
/// Optional actor wallet.
pub actor_wallet: std::option::Option,
/// Base token id.
@@ -35,6 +43,12 @@ pub struct LiquidityEventEntity {
pub quote_amount: std::string::String,
/// Optional LP amount as decimal text.
pub lp_amount: std::option::Option,
+ /// Whether the persisted amount fields are complete.
+ pub amounts_are_complete: std::option::Option,
+ /// Optional source decoded payload JSON.
+ pub payload_json: std::option::Option,
/// Execution timestamp encoded as RFC3339 UTC text.
pub executed_at: std::string::String,
+ /// Optional creation timestamp encoded as RFC3339 UTC text.
+ pub created_at: std::option::Option,
}
diff --git a/kb_lib/src/db/entities/pool_lifecycle_event.rs b/kb_lib/src/db/entities/pool_lifecycle_event.rs
new file mode 100644
index 0000000..418fb0a
--- /dev/null
+++ b/kb_lib/src/db/entities/pool_lifecycle_event.rs
@@ -0,0 +1,42 @@
+// file: kb_lib/src/db/entities/pool_lifecycle_event.rs
+
+//! Pool lifecycle event entity.
+
+/// Persisted normalized pool lifecycle event row.
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)]
+pub struct PoolLifecycleEventEntity {
+ /// Numeric primary key.
+ pub id: i64,
+ /// Related transaction id.
+ pub transaction_id: i64,
+ /// Related decoded DEX event id, when available.
+ pub decoded_event_id: std::option::Option,
+ /// Related DEX id, when the DEX row is known.
+ pub dex_id: std::option::Option,
+ /// Related pool id, when the pool row is known.
+ pub pool_id: std::option::Option,
+ /// Related pair id, when the pair row is known.
+ pub pair_id: std::option::Option,
+ /// Transaction signature.
+ pub signature: std::string::String,
+ /// Optional slot number.
+ pub slot: std::option::Option,
+ /// Protocol name that emitted the decoded event.
+ pub protocol_name: std::string::String,
+ /// Program id that emitted the decoded event.
+ pub program_id: std::string::String,
+ /// Stable decoded event kind.
+ pub event_kind: std::string::String,
+ /// Pool account address, when decoded.
+ pub pool_account: std::option::Option,
+ /// First token mint, when decoded.
+ pub token_a_mint: std::option::Option,
+ /// Second token mint, when decoded.
+ pub token_b_mint: std::option::Option,
+ /// Source decoded payload JSON.
+ pub payload_json: std::string::String,
+ /// Execution timestamp encoded as RFC3339 UTC text.
+ pub executed_at: std::string::String,
+ /// Creation timestamp encoded as RFC3339 UTC text.
+ pub created_at: std::string::String,
+}
diff --git a/kb_lib/src/db/queries.rs b/kb_lib/src/db/queries.rs
index 4e2ef14..88401b4 100644
--- a/kb_lib/src/db/queries.rs
+++ b/kb_lib/src/db/queries.rs
@@ -24,6 +24,7 @@ mod pair_analytic_signal;
mod pair_candle;
mod pair_metric;
mod pool;
+mod pool_lifecycle_event;
mod pool_listing;
mod pool_origin;
mod pool_token;
@@ -116,6 +117,9 @@ pub use pair_metric::query_pair_metrics_upsert;
pub use pool::query_pools_get_by_address;
pub use pool::query_pools_list;
pub use pool::query_pools_upsert;
+pub use pool_lifecycle_event::query_pool_lifecycle_events_get_by_decoded_event_id;
+pub use pool_lifecycle_event::query_pool_lifecycle_events_list_recent;
+pub use pool_lifecycle_event::query_pool_lifecycle_events_upsert;
pub use pool_listing::query_pool_listings_get_by_pool_id;
pub use pool_listing::query_pool_listings_list;
pub use pool_listing::query_pool_listings_upsert;
diff --git a/kb_lib/src/db/queries/liquidity_event.rs b/kb_lib/src/db/queries/liquidity_event.rs
index 5bc191a..c2fe30d 100644
--- a/kb_lib/src/db/queries/liquidity_event.rs
+++ b/kb_lib/src/db/queries/liquidity_event.rs
@@ -27,13 +27,17 @@ pub async fn query_liquidity_events_upsert(
let query_result = sqlx::query(
r#"
INSERT INTO k_sol_liquidity_events (
+ transaction_id,
+ decoded_event_id,
dex_id,
pool_id,
pair_id,
signature,
instruction_index,
slot,
+ program_id,
event_kind,
+ event_kind_text,
actor_wallet,
base_token_id,
quote_token_id,
@@ -41,15 +45,22 @@ INSERT INTO k_sol_liquidity_events (
base_amount,
quote_amount,
lp_amount,
- executed_at
+ amounts_are_complete,
+ payload_json,
+ executed_at,
+ created_at
)
-VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(signature, instruction_index) DO UPDATE SET
+ transaction_id = excluded.transaction_id,
+ decoded_event_id = excluded.decoded_event_id,
dex_id = excluded.dex_id,
pool_id = excluded.pool_id,
pair_id = excluded.pair_id,
slot = excluded.slot,
+ program_id = excluded.program_id,
event_kind = excluded.event_kind,
+ event_kind_text = excluded.event_kind_text,
actor_wallet = excluded.actor_wallet,
base_token_id = excluded.base_token_id,
quote_token_id = excluded.quote_token_id,
@@ -57,16 +68,22 @@ ON CONFLICT(signature, instruction_index) DO UPDATE SET
base_amount = excluded.base_amount,
quote_amount = excluded.quote_amount,
lp_amount = excluded.lp_amount,
+ amounts_are_complete = excluded.amounts_are_complete,
+ payload_json = excluded.payload_json,
executed_at = excluded.executed_at
"#,
)
+ .bind(dto.transaction_id)
+ .bind(dto.decoded_event_id)
.bind(dto.dex_id)
.bind(dto.pool_id)
.bind(dto.pair_id)
.bind(dto.signature.clone())
.bind(dto.instruction_index)
.bind(slot_i64)
+ .bind(dto.program_id.clone())
.bind(dto.event_kind.to_i16())
+ .bind(dto.event_kind_text.clone())
.bind(dto.actor_wallet.clone())
.bind(dto.base_token_id)
.bind(dto.quote_token_id)
@@ -74,7 +91,10 @@ ON CONFLICT(signature, instruction_index) DO UPDATE SET
.bind(dto.base_amount.clone())
.bind(dto.quote_amount.clone())
.bind(dto.lp_amount.clone())
+ .bind(if dto.amounts_are_complete { 1_i64 } else { 0_i64 })
+ .bind(dto.payload_json.clone())
.bind(dto.executed_at.to_rfc3339())
+ .bind(dto.created_at.to_rfc3339())
.execute(pool)
.await;
if let Err(error) = query_result {
@@ -122,13 +142,17 @@ pub async fn query_liquidity_events_list_recent(
r#"
SELECT
id,
+ transaction_id,
+ decoded_event_id,
dex_id,
pool_id,
pair_id,
signature,
instruction_index,
slot,
+ program_id,
event_kind,
+ event_kind_text,
actor_wallet,
base_token_id,
quote_token_id,
@@ -136,7 +160,10 @@ SELECT
base_amount,
quote_amount,
lp_amount,
- executed_at
+ amounts_are_complete,
+ payload_json,
+ executed_at,
+ created_at
FROM k_sol_liquidity_events
ORDER BY id DESC
LIMIT ?
diff --git a/kb_lib/src/db/queries/local_pipeline_diagnostics.rs b/kb_lib/src/db/queries/local_pipeline_diagnostics.rs
index 08b02ab..e9d843e 100644
--- a/kb_lib/src/db/queries/local_pipeline_diagnostics.rs
+++ b/kb_lib/src/db/queries/local_pipeline_diagnostics.rs
@@ -48,6 +48,8 @@ SELECT
FROM k_sol_dex_decoded_events
WHERE COALESCE(json_extract(payload_json, '$.eventCategory'), 'unknown') = 'unknown'
) AS decoded_unknown_event_count,
+ (SELECT COUNT(*) FROM k_sol_liquidity_events) AS liquidity_event_count,
+ (SELECT COUNT(*) FROM k_sol_pool_lifecycle_events) AS pool_lifecycle_event_count,
(
SELECT COUNT(*)
FROM k_sol_dex_decoded_events dde
@@ -357,6 +359,8 @@ SELECT
decoded_non_actionable_trade_event_count: row
.decoded_non_actionable_trade_event_count,
decoded_unknown_event_count: row.decoded_unknown_event_count,
+ liquidity_event_count: row.liquidity_event_count,
+ pool_lifecycle_event_count: row.pool_lifecycle_event_count,
missing_trade_event_count: row.missing_trade_event_count,
decoded_trade_candidate_without_trade_event_count: row
.decoded_trade_candidate_without_trade_event_count,
diff --git a/kb_lib/src/db/queries/pool_lifecycle_event.rs b/kb_lib/src/db/queries/pool_lifecycle_event.rs
new file mode 100644
index 0000000..5964ec4
--- /dev/null
+++ b/kb_lib/src/db/queries/pool_lifecycle_event.rs
@@ -0,0 +1,294 @@
+// file: kb_lib/src/db/queries/pool_lifecycle_event.rs
+
+//! Queries for `k_sol_pool_lifecycle_events`.
+
+/// Returns one pool lifecycle event by decoded event id.
+pub async fn query_pool_lifecycle_events_get_by_decoded_event_id(
+ database: &crate::Database,
+ decoded_event_id: i64,
+) -> Result, crate::Error> {
+ match database.connection() {
+ crate::DatabaseConnection::Sqlite(pool) => {
+ let query_result = sqlx::query_as::(
+ r#"
+SELECT
+ id,
+ transaction_id,
+ decoded_event_id,
+ dex_id,
+ pool_id,
+ pair_id,
+ signature,
+ slot,
+ protocol_name,
+ program_id,
+ event_kind,
+ pool_account,
+ token_a_mint,
+ token_b_mint,
+ payload_json,
+ executed_at,
+ created_at
+FROM k_sol_pool_lifecycle_events
+WHERE decoded_event_id = ?
+LIMIT 1
+ "#,
+ )
+ .bind(decoded_event_id)
+ .fetch_optional(pool)
+ .await;
+ let entity_option = match query_result {
+ Ok(entity_option) => entity_option,
+ Err(error) => {
+ return Err(crate::Error::Db(format!(
+ "cannot fetch k_sol_pool_lifecycle_events by decoded_event_id '{}' on sqlite: {}",
+ decoded_event_id, error
+ )));
+ },
+ };
+ match entity_option {
+ Some(entity) => {
+ let dto_result = crate::PoolLifecycleEventDto::try_from(entity);
+ match dto_result {
+ Ok(dto) => return Ok(Some(dto)),
+ Err(error) => return Err(error),
+ }
+ },
+ None => return Ok(None),
+ }
+ },
+ }
+}
+
+/// Inserts or updates one normalized pool lifecycle event row.
+pub async fn query_pool_lifecycle_events_upsert(
+ database: &crate::Database,
+ dto: &crate::PoolLifecycleEventDto,
+) -> Result {
+ let slot_i64 = match dto.slot {
+ Some(slot) => {
+ let slot_result = i64::try_from(slot);
+ match slot_result {
+ Ok(slot) => Some(slot),
+ Err(error) => {
+ return Err(crate::Error::Db(format!(
+ "cannot convert pool lifecycle event slot '{}' to i64: {}",
+ slot, error
+ )));
+ },
+ }
+ },
+ None => None,
+ };
+ match database.connection() {
+ crate::DatabaseConnection::Sqlite(pool) => {
+ let existing_id = match dto.decoded_event_id {
+ Some(decoded_event_id) => {
+ let existing_result = sqlx::query_scalar::(
+ r#"
+SELECT id
+FROM k_sol_pool_lifecycle_events
+WHERE decoded_event_id = ?
+LIMIT 1
+ "#,
+ )
+ .bind(decoded_event_id)
+ .fetch_optional(pool)
+ .await;
+ match existing_result {
+ Ok(existing_id) => existing_id,
+ Err(error) => {
+ return Err(crate::Error::Db(format!(
+ "cannot fetch k_sol_pool_lifecycle_events id for decoded_event_id '{}' on sqlite: {}",
+ decoded_event_id, error
+ )));
+ },
+ }
+ },
+ None => None,
+ };
+ if let Some(id) = existing_id {
+ let update_result = sqlx::query(
+ r#"
+UPDATE k_sol_pool_lifecycle_events
+SET
+ transaction_id = ?,
+ dex_id = ?,
+ pool_id = ?,
+ pair_id = ?,
+ signature = ?,
+ slot = ?,
+ protocol_name = ?,
+ program_id = ?,
+ event_kind = ?,
+ pool_account = ?,
+ token_a_mint = ?,
+ token_b_mint = ?,
+ payload_json = ?,
+ executed_at = ?
+WHERE id = ?
+ "#,
+ )
+ .bind(dto.transaction_id)
+ .bind(dto.dex_id)
+ .bind(dto.pool_id)
+ .bind(dto.pair_id)
+ .bind(dto.signature.clone())
+ .bind(slot_i64)
+ .bind(dto.protocol_name.clone())
+ .bind(dto.program_id.clone())
+ .bind(dto.event_kind.clone())
+ .bind(dto.pool_account.clone())
+ .bind(dto.token_a_mint.clone())
+ .bind(dto.token_b_mint.clone())
+ .bind(dto.payload_json.clone())
+ .bind(dto.executed_at.to_rfc3339())
+ .bind(id)
+ .execute(pool)
+ .await;
+ if let Err(error) = update_result {
+ return Err(crate::Error::Db(format!(
+ "cannot update k_sol_pool_lifecycle_events id '{}' on sqlite: {}",
+ id, error
+ )));
+ }
+ return Ok(id);
+ }
+ let insert_result = sqlx::query(
+ r#"
+INSERT INTO k_sol_pool_lifecycle_events (
+ transaction_id,
+ decoded_event_id,
+ dex_id,
+ pool_id,
+ pair_id,
+ signature,
+ slot,
+ protocol_name,
+ program_id,
+ event_kind,
+ pool_account,
+ token_a_mint,
+ token_b_mint,
+ payload_json,
+ executed_at,
+ created_at
+)
+VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ "#,
+ )
+ .bind(dto.transaction_id)
+ .bind(dto.decoded_event_id)
+ .bind(dto.dex_id)
+ .bind(dto.pool_id)
+ .bind(dto.pair_id)
+ .bind(dto.signature.clone())
+ .bind(slot_i64)
+ .bind(dto.protocol_name.clone())
+ .bind(dto.program_id.clone())
+ .bind(dto.event_kind.clone())
+ .bind(dto.pool_account.clone())
+ .bind(dto.token_a_mint.clone())
+ .bind(dto.token_b_mint.clone())
+ .bind(dto.payload_json.clone())
+ .bind(dto.executed_at.to_rfc3339())
+ .bind(dto.created_at.to_rfc3339())
+ .execute(pool)
+ .await;
+ if let Err(error) = insert_result {
+ return Err(crate::Error::Db(format!(
+ "cannot insert k_sol_pool_lifecycle_events on sqlite: {}",
+ error
+ )));
+ }
+ let id_result = sqlx::query_scalar::(
+ r#"
+SELECT id
+FROM k_sol_pool_lifecycle_events
+WHERE transaction_id = ?
+ AND protocol_name = ?
+ AND event_kind = ?
+ AND signature = ?
+ORDER BY id DESC
+LIMIT 1
+ "#,
+ )
+ .bind(dto.transaction_id)
+ .bind(dto.protocol_name.clone())
+ .bind(dto.event_kind.clone())
+ .bind(dto.signature.clone())
+ .fetch_one(pool)
+ .await;
+ match id_result {
+ Ok(id) => return Ok(id),
+ Err(error) => {
+ return Err(crate::Error::Db(format!(
+ "cannot fetch inserted k_sol_pool_lifecycle_events id for signature '{}' on sqlite: {}",
+ dto.signature, error
+ )));
+ },
+ }
+ },
+ }
+}
+
+/// Lists recent pool lifecycle events ordered from newest to oldest.
+pub async fn query_pool_lifecycle_events_list_recent(
+ database: &crate::Database,
+ limit: u32,
+) -> Result, crate::Error> {
+ if limit == 0 {
+ return Ok(std::vec::Vec::new());
+ }
+ match database.connection() {
+ crate::DatabaseConnection::Sqlite(pool) => {
+ let query_result = sqlx::query_as::(
+ r#"
+SELECT
+ id,
+ transaction_id,
+ decoded_event_id,
+ dex_id,
+ pool_id,
+ pair_id,
+ signature,
+ slot,
+ protocol_name,
+ program_id,
+ event_kind,
+ pool_account,
+ token_a_mint,
+ token_b_mint,
+ payload_json,
+ executed_at,
+ created_at
+FROM k_sol_pool_lifecycle_events
+ORDER BY id DESC
+LIMIT ?
+ "#,
+ )
+ .bind(i64::from(limit))
+ .fetch_all(pool)
+ .await;
+ let entities = match query_result {
+ Ok(entities) => entities,
+ Err(error) => {
+ return Err(crate::Error::Db(format!(
+ "cannot list k_sol_pool_lifecycle_events on sqlite: {}",
+ error
+ )));
+ },
+ };
+ let mut dtos = std::vec::Vec::new();
+ for entity in entities {
+ let dto_result = crate::PoolLifecycleEventDto::try_from(entity);
+ let dto = match dto_result {
+ Ok(dto) => dto,
+ Err(error) => return Err(error),
+ };
+ dtos.push(dto);
+ }
+ return Ok(dtos);
+ },
+ }
+}
diff --git a/kb_lib/src/db/schema.rs b/kb_lib/src/db/schema.rs
index 4d5f3bc..9e4bcaf 100644
--- a/kb_lib/src/db/schema.rs
+++ b/kb_lib/src/db/schema.rs
@@ -1072,13 +1072,17 @@ async fn create_tbl_liquidity_events(pool: &sqlx::SqlitePool) -> Result<(), crat
r#"
CREATE TABLE IF NOT EXISTS k_sol_liquidity_events (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ transaction_id INTEGER NULL,
+ decoded_event_id INTEGER NULL,
dex_id INTEGER NOT NULL,
pool_id INTEGER NOT NULL,
pair_id INTEGER NULL,
signature TEXT NOT NULL,
instruction_index INTEGER NOT NULL,
slot INTEGER NULL,
+ program_id TEXT NULL,
event_kind INTEGER NOT NULL,
+ event_kind_text TEXT NULL,
actor_wallet TEXT NULL,
base_token_id INTEGER NOT NULL,
quote_token_id INTEGER NOT NULL,
@@ -1086,7 +1090,12 @@ CREATE TABLE IF NOT EXISTS k_sol_liquidity_events (
base_amount TEXT NOT NULL,
quote_amount TEXT NOT NULL,
lp_amount TEXT NULL,
+ amounts_are_complete INTEGER NOT NULL DEFAULT 1,
+ payload_json TEXT NULL,
executed_at TEXT NOT NULL,
+ created_at TEXT NULL,
+ FOREIGN KEY(transaction_id) REFERENCES k_sol_chain_transactions(id),
+ FOREIGN KEY(decoded_event_id) REFERENCES k_sol_dex_decoded_events(id),
FOREIGN KEY(dex_id) REFERENCES k_sol_dexes(id),
FOREIGN KEY(pool_id) REFERENCES k_sol_pools(id),
FOREIGN KEY(pair_id) REFERENCES k_sol_pairs(id),
diff --git a/kb_lib/src/db/types/liquidity_event_kind.rs b/kb_lib/src/db/types/liquidity_event_kind.rs
index 426e946..525980f 100644
--- a/kb_lib/src/db/types/liquidity_event_kind.rs
+++ b/kb_lib/src/db/types/liquidity_event_kind.rs
@@ -9,6 +9,10 @@ pub enum LiquidityEventKind {
Add,
/// Liquidity removal.
Remove,
+ /// Concentrated-liquidity position opening without a guaranteed amount delta.
+ PositionOpen,
+ /// Concentrated-liquidity position closing without a guaranteed amount delta.
+ PositionClose,
}
impl LiquidityEventKind {
@@ -17,6 +21,8 @@ impl LiquidityEventKind {
match self {
Self::Add => return 0,
Self::Remove => return 1,
+ Self::PositionOpen => return 2,
+ Self::PositionClose => return 3,
}
}
@@ -25,6 +31,8 @@ impl LiquidityEventKind {
match value {
0 => return Ok(Self::Add),
1 => return Ok(Self::Remove),
+ 2 => return Ok(Self::PositionOpen),
+ 3 => return Ok(Self::PositionClose),
_ => {
return Err(crate::Error::Db(format!(
"invalid LiquidityEventKind value: {}",
diff --git a/kb_lib/src/dex.rs b/kb_lib/src/dex.rs
index b1026df..7b98fef 100644
--- a/kb_lib/src/dex.rs
+++ b/kb_lib/src/dex.rs
@@ -38,6 +38,8 @@ pub use meteora_dbc::MeteoraDbcSwapDecoded;
pub use meteora_dlmm::MeteoraDlmmCreatePoolDecoded;
pub use meteora_dlmm::MeteoraDlmmDecodedEvent;
pub use meteora_dlmm::MeteoraDlmmDecoder;
+pub use meteora_dlmm::MeteoraDlmmLiquidityDecoded;
+pub use meteora_dlmm::MeteoraDlmmPoolLifecycleDecoded;
pub use meteora_dlmm::MeteoraDlmmSwapDecoded;
pub use orca_whirlpools::OrcaWhirlpoolsCreatePoolDecoded;
pub use orca_whirlpools::OrcaWhirlpoolsDecodedEvent;
diff --git a/kb_lib/src/dex/meteora_dlmm.rs b/kb_lib/src/dex/meteora_dlmm.rs
index 163d040..f972db7 100644
--- a/kb_lib/src/dex/meteora_dlmm.rs
+++ b/kb_lib/src/dex/meteora_dlmm.rs
@@ -8,9 +8,17 @@
const DLMM_DISCRIMINATOR_CLAIM_FEE2: [u8; 8] = [0x70, 0xbf, 0x65, 0xab, 0x1c, 0x90, 0x7f, 0xbb];
+const DLMM_DISCRIMINATOR_INITIALIZE_BIN_ARRAY: [u8; 8] =
+ [0x23, 0x56, 0x13, 0xb9, 0x4e, 0xd4, 0x4b, 0xd3];
+
const DLMM_DISCRIMINATOR_INITIALIZE_POSITION: [u8; 8] =
[0xdb, 0xc0, 0xea, 0x47, 0xbe, 0xbf, 0x66, 0x50];
+const DLMM_DISCRIMINATOR_ADD_LIQUIDITY: [u8; 8] = [0xb5, 0x9d, 0x59, 0x43, 0x8f, 0xb6, 0x34, 0x48];
+
+const DLMM_DISCRIMINATOR_REMOVE_LIQUIDITY: [u8; 8] =
+ [0x50, 0x55, 0xd1, 0x48, 0x18, 0xce, 0xb1, 0x6c];
+
const DLMM_DISCRIMINATOR_INITIALIZE_LB_PAIR: [u8; 8] =
[0x2d, 0x9a, 0xed, 0xd2, 0xdd, 0x0f, 0xa6, 0x5c];
@@ -89,6 +97,60 @@ pub struct MeteoraDlmmSwapDecoded {
pub payload_json: serde_json::Value,
}
+/// Decoded Meteora DLMM liquidity lifecycle event.
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct MeteoraDlmmLiquidityDecoded {
+ /// Parent transaction id.
+ pub transaction_id: i64,
+ /// Parent instruction id.
+ pub instruction_id: i64,
+ /// Transaction signature.
+ pub signature: std::string::String,
+ /// Program id.
+ pub program_id: std::string::String,
+ /// Normalized decoded event kind.
+ pub event_kind: std::string::String,
+ /// Optional DLMM pair/pool account.
+ pub pool_account: std::option::Option,
+ /// Optional token X/base mint.
+ pub token_a_mint: std::option::Option,
+ /// Optional token Y/quote mint.
+ pub token_b_mint: std::option::Option,
+ /// Optional actor wallet or owner account.
+ pub actor_wallet: std::option::Option,
+ /// Optional decoded base/token-X amount.
+ pub base_amount_raw: std::option::Option,
+ /// Optional decoded quote/token-Y amount.
+ pub quote_amount_raw: std::option::Option,
+ /// Optional decoded liquidity amount.
+ pub liquidity_amount_raw: std::option::Option,
+ /// Decoded payload.
+ pub payload_json: serde_json::Value,
+}
+
+/// Decoded Meteora DLMM pool lifecycle event.
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct MeteoraDlmmPoolLifecycleDecoded {
+ /// Parent transaction id.
+ pub transaction_id: i64,
+ /// Parent instruction id.
+ pub instruction_id: i64,
+ /// Transaction signature.
+ pub signature: std::string::String,
+ /// Program id.
+ pub program_id: std::string::String,
+ /// Normalized decoded event kind.
+ pub event_kind: std::string::String,
+ /// Optional DLMM pair/pool account.
+ pub pool_account: std::option::Option,
+ /// Optional token X/base mint.
+ pub token_a_mint: std::option::Option,
+ /// Optional token Y/quote mint.
+ pub token_b_mint: std::option::Option,
+ /// Decoded payload.
+ pub payload_json: serde_json::Value,
+}
+
/// Decoded Meteora DLMM event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum MeteoraDlmmDecodedEvent {
@@ -96,12 +158,20 @@ pub enum MeteoraDlmmDecodedEvent {
CreatePool(MeteoraDlmmCreatePoolDecoded),
/// DLMM swap.
Swap(MeteoraDlmmSwapDecoded),
+ /// DLMM liquidity lifecycle event.
+ Liquidity(MeteoraDlmmLiquidityDecoded),
+ /// DLMM pool lifecycle event that is not the canonical create-pool event.
+ PoolLifecycle(MeteoraDlmmPoolLifecycleDecoded),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum MeteoraDlmmInstructionKind {
CreatePool,
Swap,
+ LiquidityAdd,
+ LiquidityRemove,
+ PositionOpen,
+ PoolLifecycle,
Ignore,
Unknown,
}
@@ -117,6 +187,9 @@ enum MeteoraDlmmInstructionName {
SwapExactOut,
SwapExactOut2,
SwapWithPriceImpact,
+ InitializeBinArray,
+ AddLiquidity,
+ RemoveLiquidity,
ClaimFee2,
InitializePosition,
Unknown,
@@ -138,6 +211,9 @@ impl MeteoraDlmmInstructionName {
Self::SwapExactOut => return "swap_exact_out",
Self::SwapExactOut2 => return "swap_exact_out2",
Self::SwapWithPriceImpact => return "swap_with_price_impact",
+ Self::InitializeBinArray => return "initialize_bin_array",
+ Self::AddLiquidity => return "add_liquidity",
+ Self::RemoveLiquidity => return "remove_liquidity",
Self::ClaimFee2 => return "claim_fee2",
Self::InitializePosition => return "initialize_position",
Self::Unknown => return "unknown",
@@ -157,8 +233,12 @@ impl MeteoraDlmmInstructionName {
| Self::SwapExactOut
| Self::SwapExactOut2
| Self::SwapWithPriceImpact => return MeteoraDlmmInstructionKind::Swap,
+ Self::AddLiquidity => return MeteoraDlmmInstructionKind::LiquidityAdd,
+ Self::RemoveLiquidity => return MeteoraDlmmInstructionKind::LiquidityRemove,
+ Self::InitializePosition => return MeteoraDlmmInstructionKind::PositionOpen,
+ Self::InitializeBinArray => return MeteoraDlmmInstructionKind::PoolLifecycle,
Self::Unknown => return MeteoraDlmmInstructionKind::Unknown,
- Self::ClaimFee2 | Self::InitializePosition => {
+ Self::ClaimFee2 => {
return MeteoraDlmmInstructionKind::Ignore;
},
}
@@ -249,12 +329,15 @@ impl MeteoraDlmmDecoder {
resolve_dlmm_token_x_mint(instruction_name, parsed_json.as_ref(), &accounts);
let token_b_mint =
resolve_dlmm_token_y_mint(instruction_name, parsed_json.as_ref(), &accounts);
- if pool_account.is_none() || token_a_mint.is_none() || token_b_mint.is_none() {
+ if pool_account.is_none() {
continue;
}
let config_account =
resolve_dlmm_config_account(instruction_name, parsed_json.as_ref(), &accounts);
if instruction_kind == MeteoraDlmmInstructionKind::CreatePool {
+ if token_a_mint.is_none() || token_b_mint.is_none() {
+ continue;
+ }
let payload_json = serde_json::json!({
"decoder": "meteora_dlmm",
"eventKind": "create_pool",
@@ -293,6 +376,9 @@ impl MeteoraDlmmDecoder {
continue;
}
if instruction_kind == MeteoraDlmmInstructionKind::Swap {
+ if token_a_mint.is_none() || token_b_mint.is_none() {
+ continue;
+ }
let reserve_x_account = resolve_dlmm_reserve_x_account(
instruction_name,
parsed_json.as_ref(),
@@ -357,6 +443,127 @@ impl MeteoraDlmmDecoder {
payload_json,
},
));
+ continue;
+ }
+ if instruction_kind == MeteoraDlmmInstructionKind::LiquidityAdd
+ || instruction_kind == MeteoraDlmmInstructionKind::LiquidityRemove
+ || instruction_kind == MeteoraDlmmInstructionKind::PositionOpen
+ {
+ let event_kind = format!("meteora_dlmm.{}", instruction_name.as_str());
+ let actor_wallet =
+ resolve_dlmm_actor_wallet(instruction_name, parsed_json.as_ref(), &accounts);
+ let base_amount_raw = extract_amount_string_by_candidate_keys(
+ parsed_json.as_ref(),
+ &[
+ "baseAmountRaw",
+ "base_amount_raw",
+ "tokenXAmount",
+ "token_x_amount",
+ "amountX",
+ "amount_x",
+ ],
+ );
+ let quote_amount_raw = extract_amount_string_by_candidate_keys(
+ parsed_json.as_ref(),
+ &[
+ "quoteAmountRaw",
+ "quote_amount_raw",
+ "tokenYAmount",
+ "token_y_amount",
+ "amountY",
+ "amount_y",
+ ],
+ );
+ let liquidity_amount_raw = extract_amount_string_by_candidate_keys(
+ parsed_json.as_ref(),
+ &[
+ "liquidity",
+ "liquidityAmount",
+ "liquidity_amount",
+ "binLiquidity",
+ "bin_liquidity",
+ ],
+ );
+ let payload_json = serde_json::json!({
+ "decoder": "meteora_dlmm",
+ "eventKind": instruction_name.as_str(),
+ "decodedInstructionName": instruction_name.as_str(),
+ "dataDiscriminatorHex": instruction_data
+ .as_ref()
+ .and_then(|data| return first_8_bytes_hex(data.as_slice())),
+ "classifiedInstructionKind": crate::classify_dex_event_lifecycle_kind_code(event_kind.as_str()),
+ "signature": transaction.signature,
+ "instructionId": instruction_id,
+ "parentInstructionId": instruction.parent_instruction_id,
+ "instructionIndex": instruction.instruction_index,
+ "innerInstructionIndex": instruction.inner_instruction_index,
+ "stackHeight": instruction.stack_height,
+ "accounts": accounts,
+ "parsed": parsed_json,
+ "logMessages": log_messages,
+ "poolAccount": pool_account,
+ "tokenAMint": token_a_mint,
+ "tokenBMint": token_b_mint,
+ "actorWallet": actor_wallet,
+ "baseAmountRaw": base_amount_raw,
+ "quoteAmountRaw": quote_amount_raw,
+ "liquidityAmountRaw": liquidity_amount_raw
+ });
+ decoded_events.push(crate::MeteoraDlmmDecodedEvent::Liquidity(
+ crate::MeteoraDlmmLiquidityDecoded {
+ transaction_id,
+ instruction_id,
+ signature: transaction.signature.clone(),
+ program_id: program_id.to_string(),
+ event_kind,
+ pool_account,
+ token_a_mint,
+ token_b_mint,
+ actor_wallet,
+ base_amount_raw,
+ quote_amount_raw,
+ liquidity_amount_raw,
+ payload_json,
+ },
+ ));
+ continue;
+ }
+ if instruction_kind == MeteoraDlmmInstructionKind::PoolLifecycle {
+ let event_kind = format!("meteora_dlmm.{}", instruction_name.as_str());
+ let payload_json = serde_json::json!({
+ "decoder": "meteora_dlmm",
+ "eventKind": instruction_name.as_str(),
+ "decodedInstructionName": instruction_name.as_str(),
+ "dataDiscriminatorHex": instruction_data
+ .as_ref()
+ .and_then(|data| return first_8_bytes_hex(data.as_slice())),
+ "classifiedInstructionKind": crate::classify_dex_event_lifecycle_kind_code(event_kind.as_str()),
+ "signature": transaction.signature,
+ "instructionId": instruction_id,
+ "parentInstructionId": instruction.parent_instruction_id,
+ "instructionIndex": instruction.instruction_index,
+ "innerInstructionIndex": instruction.inner_instruction_index,
+ "stackHeight": instruction.stack_height,
+ "accounts": accounts,
+ "parsed": parsed_json,
+ "logMessages": log_messages,
+ "poolAccount": pool_account,
+ "tokenAMint": token_a_mint,
+ "tokenBMint": token_b_mint
+ });
+ decoded_events.push(crate::MeteoraDlmmDecodedEvent::PoolLifecycle(
+ crate::MeteoraDlmmPoolLifecycleDecoded {
+ transaction_id,
+ instruction_id,
+ signature: transaction.signature.clone(),
+ program_id: program_id.to_string(),
+ event_kind,
+ pool_account,
+ token_a_mint,
+ token_b_mint,
+ payload_json,
+ },
+ ));
}
}
return Ok(decoded_events);
@@ -382,6 +589,18 @@ fn classify_instruction_name(
if contains_swap_hint(parsed_type) {
return MeteoraDlmmInstructionName::Swap;
}
+ if contains_add_liquidity_hint(parsed_type) {
+ return MeteoraDlmmInstructionName::AddLiquidity;
+ }
+ if contains_remove_liquidity_hint(parsed_type) {
+ return MeteoraDlmmInstructionName::RemoveLiquidity;
+ }
+ if contains_initialize_position_hint(parsed_type) {
+ return MeteoraDlmmInstructionName::InitializePosition;
+ }
+ if contains_initialize_bin_array_hint(parsed_type) {
+ return MeteoraDlmmInstructionName::InitializeBinArray;
+ }
if parsed_type.is_some() {
return MeteoraDlmmInstructionName::Unknown;
}
@@ -392,6 +611,18 @@ fn classify_instruction_name(
if contains_swap_hint_in_value(parsed_json) {
return MeteoraDlmmInstructionName::Swap;
}
+ if contains_add_liquidity_hint_in_value(parsed_json) {
+ return MeteoraDlmmInstructionName::AddLiquidity;
+ }
+ if contains_remove_liquidity_hint_in_value(parsed_json) {
+ return MeteoraDlmmInstructionName::RemoveLiquidity;
+ }
+ if contains_initialize_position_hint_in_value(parsed_json) {
+ return MeteoraDlmmInstructionName::InitializePosition;
+ }
+ if contains_initialize_bin_array_hint_in_value(parsed_json) {
+ return MeteoraDlmmInstructionName::InitializeBinArray;
+ }
return MeteoraDlmmInstructionName::Unknown;
}
for log_message in log_messages {
@@ -401,6 +632,18 @@ fn classify_instruction_name(
if contains_swap_hint(Some(log_message.as_str())) {
return MeteoraDlmmInstructionName::Swap;
}
+ if contains_add_liquidity_hint(Some(log_message.as_str())) {
+ return MeteoraDlmmInstructionName::AddLiquidity;
+ }
+ if contains_remove_liquidity_hint(Some(log_message.as_str())) {
+ return MeteoraDlmmInstructionName::RemoveLiquidity;
+ }
+ if contains_initialize_position_hint(Some(log_message.as_str())) {
+ return MeteoraDlmmInstructionName::InitializePosition;
+ }
+ if contains_initialize_bin_array_hint(Some(log_message.as_str())) {
+ return MeteoraDlmmInstructionName::InitializeBinArray;
+ }
}
return MeteoraDlmmInstructionName::Unknown;
}
@@ -452,6 +695,15 @@ fn classify_instruction_name_from_data(
if discriminator == DLMM_DISCRIMINATOR_SWAP_WITH_PRICE_IMPACT {
return MeteoraDlmmInstructionName::SwapWithPriceImpact;
}
+ if discriminator == DLMM_DISCRIMINATOR_INITIALIZE_BIN_ARRAY {
+ return MeteoraDlmmInstructionName::InitializeBinArray;
+ }
+ if discriminator == DLMM_DISCRIMINATOR_ADD_LIQUIDITY {
+ return MeteoraDlmmInstructionName::AddLiquidity;
+ }
+ if discriminator == DLMM_DISCRIMINATOR_REMOVE_LIQUIDITY {
+ return MeteoraDlmmInstructionName::RemoveLiquidity;
+ }
if discriminator == DLMM_DISCRIMINATOR_CLAIM_FEE2 {
return MeteoraDlmmInstructionName::ClaimFee2;
}
@@ -490,12 +742,19 @@ fn resolve_dlmm_pool_account(
| MeteoraDlmmInstructionName::Swap2
| MeteoraDlmmInstructionName::SwapExactOut
| MeteoraDlmmInstructionName::SwapExactOut2
- | MeteoraDlmmInstructionName::SwapWithPriceImpact => {
+ | MeteoraDlmmInstructionName::SwapWithPriceImpact
+ | MeteoraDlmmInstructionName::InitializeBinArray => {
return extract_account(accounts, 0);
},
- MeteoraDlmmInstructionName::ClaimFee2
- | MeteoraDlmmInstructionName::InitializePosition
- | MeteoraDlmmInstructionName::Unknown => return None,
+ MeteoraDlmmInstructionName::AddLiquidity | MeteoraDlmmInstructionName::RemoveLiquidity => {
+ return extract_account(accounts, 1);
+ },
+ MeteoraDlmmInstructionName::InitializePosition => {
+ return extract_account(accounts, 2);
+ },
+ MeteoraDlmmInstructionName::ClaimFee2 | MeteoraDlmmInstructionName::Unknown => {
+ return None;
+ },
}
}
@@ -535,7 +794,11 @@ fn resolve_dlmm_token_x_mint(
| MeteoraDlmmInstructionName::SwapWithPriceImpact => {
return extract_account(accounts, 6);
},
+ MeteoraDlmmInstructionName::AddLiquidity | MeteoraDlmmInstructionName::RemoveLiquidity => {
+ return extract_account(accounts, 7);
+ },
MeteoraDlmmInstructionName::ClaimFee2
+ | MeteoraDlmmInstructionName::InitializeBinArray
| MeteoraDlmmInstructionName::InitializePosition
| MeteoraDlmmInstructionName::Unknown => return None,
}
@@ -577,7 +840,11 @@ fn resolve_dlmm_token_y_mint(
| MeteoraDlmmInstructionName::SwapWithPriceImpact => {
return extract_account(accounts, 7);
},
+ MeteoraDlmmInstructionName::AddLiquidity | MeteoraDlmmInstructionName::RemoveLiquidity => {
+ return extract_account(accounts, 8);
+ },
MeteoraDlmmInstructionName::ClaimFee2
+ | MeteoraDlmmInstructionName::InitializeBinArray
| MeteoraDlmmInstructionName::InitializePosition
| MeteoraDlmmInstructionName::Unknown => return None,
}
@@ -715,6 +982,86 @@ fn resolve_dlmm_config_account(
}
}
+fn resolve_dlmm_actor_wallet(
+ instruction_name: MeteoraDlmmInstructionName,
+ parsed_json: std::option::Option<&serde_json::Value>,
+ accounts: &[std::string::String],
+) -> std::option::Option {
+ let parsed_value = extract_string_by_candidate_keys(
+ parsed_json,
+ &["owner", "payer", "sender", "user", "authority", "liquidityProvider"],
+ );
+ if parsed_value.is_some() {
+ return parsed_value;
+ }
+ match instruction_name {
+ MeteoraDlmmInstructionName::AddLiquidity | MeteoraDlmmInstructionName::RemoveLiquidity => {
+ return extract_account(accounts, 9);
+ },
+ MeteoraDlmmInstructionName::InitializePosition => {
+ return extract_account(accounts, 3);
+ },
+ _ => return None,
+ }
+}
+
+fn extract_amount_string_by_candidate_keys(
+ value: std::option::Option<&serde_json::Value>,
+ candidate_keys: &[&str],
+) -> std::option::Option {
+ let value = match value {
+ Some(value) => value,
+ None => return None,
+ };
+ if let Some(text) = extract_string_by_candidate_keys(Some(value), candidate_keys) {
+ return Some(text);
+ }
+ return extract_number_by_candidate_keys(Some(value), candidate_keys);
+}
+
+fn extract_number_by_candidate_keys(
+ value: std::option::Option<&serde_json::Value>,
+ candidate_keys: &[&str],
+) -> std::option::Option {
+ let value = match value {
+ Some(value) => value,
+ None => return None,
+ };
+ match value {
+ serde_json::Value::Object(object) => {
+ for candidate_key in candidate_keys {
+ if let Some(candidate) = object.get(*candidate_key) {
+ if let Some(number) = candidate.as_i64() {
+ return Some(number.to_string());
+ }
+ if let Some(number) = candidate.as_u64() {
+ return Some(number.to_string());
+ }
+ if let Some(number) = candidate.as_f64() {
+ return Some(number.to_string());
+ }
+ }
+ }
+ for nested in object.values() {
+ let result = extract_number_by_candidate_keys(Some(nested), candidate_keys);
+ if result.is_some() {
+ return result;
+ }
+ }
+ },
+ serde_json::Value::Array(values) => {
+ for nested in values {
+ let result = extract_number_by_candidate_keys(Some(nested), candidate_keys);
+ if result.is_some() {
+ return result;
+ }
+ }
+ },
+ _ => {},
+ }
+ return None;
+}
+
fn contains_create_pool_hint(value: std::option::Option<&str>) -> bool {
let value = match value {
Some(value) => value.to_ascii_lowercase(),
@@ -755,6 +1102,62 @@ fn contains_swap_hint(value: std::option::Option<&str>) -> bool {
return false;
}
+fn contains_add_liquidity_hint(value: std::option::Option<&str>) -> bool {
+ let value = match value {
+ Some(value) => value.to_ascii_lowercase(),
+ None => return false,
+ };
+ if value.contains("addliquidity") {
+ return true;
+ }
+ if value.contains("add_liquidity") {
+ return true;
+ }
+ return false;
+}
+
+fn contains_remove_liquidity_hint(value: std::option::Option<&str>) -> bool {
+ let value = match value {
+ Some(value) => value.to_ascii_lowercase(),
+ None => return false,
+ };
+ if value.contains("removeliquidity") {
+ return true;
+ }
+ if value.contains("remove_liquidity") {
+ return true;
+ }
+ return false;
+}
+
+fn contains_initialize_position_hint(value: std::option::Option<&str>) -> bool {
+ let value = match value {
+ Some(value) => value.to_ascii_lowercase(),
+ None => return false,
+ };
+ if value.contains("initializeposition") {
+ return true;
+ }
+ if value.contains("initialize_position") {
+ return true;
+ }
+ return false;
+}
+
+fn contains_initialize_bin_array_hint(value: std::option::Option<&str>) -> bool {
+ let value = match value {
+ Some(value) => value.to_ascii_lowercase(),
+ None => return false,
+ };
+ if value.contains("initializebinarray") {
+ return true;
+ }
+ if value.contains("initialize_bin_array") {
+ return true;
+ }
+ return false;
+}
+
fn contains_create_pool_hint_in_value(value: &serde_json::Value) -> bool {
return contains_string_hint_in_value(value, contains_create_pool_hint);
}
@@ -763,6 +1166,22 @@ fn contains_swap_hint_in_value(value: &serde_json::Value) -> bool {
return contains_string_hint_in_value(value, contains_swap_hint);
}
+fn contains_add_liquidity_hint_in_value(value: &serde_json::Value) -> bool {
+ return contains_string_hint_in_value(value, contains_add_liquidity_hint);
+}
+
+fn contains_remove_liquidity_hint_in_value(value: &serde_json::Value) -> bool {
+ return contains_string_hint_in_value(value, contains_remove_liquidity_hint);
+}
+
+fn contains_initialize_position_hint_in_value(value: &serde_json::Value) -> bool {
+ return contains_string_hint_in_value(value, contains_initialize_position_hint);
+}
+
+fn contains_initialize_bin_array_hint_in_value(value: &serde_json::Value) -> bool {
+ return contains_string_hint_in_value(value, contains_initialize_bin_array_hint);
+}
+
fn contains_string_hint_in_value(
value: &serde_json::Value,
predicate: fn(std::option::Option<&str>) -> bool,
@@ -1118,6 +1537,10 @@ mod tests {
crate::MeteoraDlmmDecodedEvent::Swap(_) => {
panic!("unexpected swap event");
},
+ crate::MeteoraDlmmDecodedEvent::Liquidity(_)
+ | crate::MeteoraDlmmDecodedEvent::PoolLifecycle(_) => {
+ panic!("unexpected non-trade event");
+ },
}
}
@@ -1144,11 +1567,15 @@ mod tests {
crate::MeteoraDlmmDecodedEvent::CreatePool(_) => {
panic!("unexpected create event");
},
+ crate::MeteoraDlmmDecodedEvent::Liquidity(_)
+ | crate::MeteoraDlmmDecodedEvent::PoolLifecycle(_) => {
+ panic!("unexpected non-trade event");
+ },
}
}
#[test]
- fn meteora_dlmm_ignores_unclear_instruction() {
+ fn meteora_dlmm_initialize_bin_array_hint_is_decoded_as_pool_lifecycle() {
let decoder = crate::MeteoraDlmmDecoder::new();
let transaction = make_swap_transaction();
let mut instruction = make_swap_instruction();
@@ -1159,7 +1586,14 @@ mod tests {
Ok(decoded) => decoded,
Err(error) => panic!("decode must succeed: {}", error),
};
- assert_eq!(decoded.len(), 0);
+ assert_eq!(decoded.len(), 1);
+ match &decoded[0] {
+ crate::MeteoraDlmmDecodedEvent::PoolLifecycle(event) => {
+ assert_eq!(event.event_kind, "meteora_dlmm.initialize_bin_array");
+ assert_eq!(event.pool_account, Some("DlmmPairSwap111".to_string()));
+ },
+ _ => panic!("expected pool lifecycle event"),
+ }
}
#[test]
@@ -1294,17 +1728,89 @@ mod tests {
crate::MeteoraDlmmDecodedEvent::CreatePool(_) => {
panic!("unexpected create event");
},
+ crate::MeteoraDlmmDecodedEvent::Liquidity(_)
+ | crate::MeteoraDlmmDecodedEvent::PoolLifecycle(_) => {
+ panic!("unexpected non-trade event");
+ },
}
}
#[test]
- fn meteora_dlmm_initialize_position_discriminator_is_ignored() {
+ fn meteora_dlmm_initialize_position_discriminator_is_non_trade_position_open() {
let instruction_data = [0xdb, 0xc0, 0xea, 0x47, 0xbe, 0xbf, 0x66, 0x50, 0x01, 0x02, 0x03];
let log_messages = vec!["Program log: Instruction: Swap".to_string()];
let name =
super::classify_instruction_name(None, None, Some(&instruction_data), &log_messages);
assert_eq!(name, super::MeteoraDlmmInstructionName::InitializePosition);
- assert_eq!(name.kind(), super::MeteoraDlmmInstructionKind::Ignore);
+ assert_eq!(name.kind(), super::MeteoraDlmmInstructionKind::PositionOpen);
+ }
+
+ #[test]
+ fn meteora_dlmm_add_remove_liquidity_discriminators_are_non_trade_liquidity() {
+ let add_data = [0xb5, 0x9d, 0x59, 0x43, 0x8f, 0xb6, 0x34, 0x48, 0x01];
+ let remove_data = [0x50, 0x55, 0xd1, 0x48, 0x18, 0xce, 0xb1, 0x6c, 0x01];
+ let add_name = super::classify_instruction_name_from_data(Some(&add_data));
+ let remove_name = super::classify_instruction_name_from_data(Some(&remove_data));
+ assert_eq!(add_name, super::MeteoraDlmmInstructionName::AddLiquidity);
+ assert_eq!(add_name.kind(), super::MeteoraDlmmInstructionKind::LiquidityAdd);
+ assert_eq!(remove_name, super::MeteoraDlmmInstructionName::RemoveLiquidity);
+ assert_eq!(remove_name.kind(), super::MeteoraDlmmInstructionKind::LiquidityRemove);
+ }
+
+ #[test]
+ fn meteora_dlmm_initialize_bin_array_is_pool_lifecycle() {
+ let data = [0x23, 0x56, 0x13, 0xb9, 0x4e, 0xd4, 0x4b, 0xd3, 0x01];
+ let name = super::classify_instruction_name_from_data(Some(&data));
+ assert_eq!(name, super::MeteoraDlmmInstructionName::InitializeBinArray);
+ assert_eq!(name.kind(), super::MeteoraDlmmInstructionKind::PoolLifecycle);
+ }
+
+ #[test]
+ fn meteora_dlmm_add_liquidity_is_decoded_as_non_trade_event() {
+ let decoder = crate::MeteoraDlmmDecoder::new();
+ let transaction = make_swap_transaction();
+ let mut instruction = crate::ChainInstructionDto::new(
+ 403,
+ None,
+ 0,
+ None,
+ Some(crate::METEORA_DLMM_PROGRAM_ID.to_string()),
+ Some("meteora-dlmm".to_string()),
+ Some(1),
+ serde_json::json!([
+ "Position111",
+ "DlmmPairSwap111",
+ "Bitmap111",
+ "UserTokenX111",
+ "UserTokenY111",
+ "ReserveX111",
+ "ReserveY111",
+ "DlmmSwapTokenX111",
+ crate::WSOL_MINT_ID,
+ "Owner111"
+ ])
+ .to_string(),
+ Some("\"3K5citUwVB6uv\"".to_string()),
+ None,
+ None,
+ );
+ instruction.id = Some(405);
+ let decoded_result = decoder.decode_transaction(&transaction, &[instruction]);
+ let decoded = match decoded_result {
+ Ok(decoded) => decoded,
+ Err(error) => panic!("decode must succeed: {}", error),
+ };
+ assert_eq!(decoded.len(), 1);
+ match &decoded[0] {
+ crate::MeteoraDlmmDecodedEvent::Liquidity(event) => {
+ assert_eq!(event.event_kind, "meteora_dlmm.add_liquidity");
+ assert_eq!(event.pool_account, Some("DlmmPairSwap111".to_string()));
+ assert_eq!(event.token_a_mint, Some("DlmmSwapTokenX111".to_string()));
+ assert_eq!(event.token_b_mint, Some(crate::WSOL_MINT_ID.to_string()));
+ assert_eq!(event.actor_wallet, Some("Owner111".to_string()));
+ },
+ _ => panic!("expected liquidity event"),
+ }
}
#[test]
diff --git a/kb_lib/src/dex_decode.rs b/kb_lib/src/dex_decode.rs
index f70ff56..d15e821 100644
--- a/kb_lib/src/dex_decode.rs
+++ b/kb_lib/src/dex_decode.rs
@@ -407,6 +407,42 @@ impl DexDecodeService {
)
.await;
},
+ crate::MeteoraDlmmDecodedEvent::Liquidity(event) => {
+ return self
+ .materialize_named_dex_event(
+ transaction,
+ event.transaction_id,
+ event.instruction_id,
+ "meteora_dlmm",
+ event.program_id.clone(),
+ event.event_kind.as_str(),
+ event.pool_account.clone(),
+ None,
+ event.token_a_mint.clone(),
+ event.token_b_mint.clone(),
+ None,
+ event.payload_json.clone(),
+ )
+ .await;
+ },
+ crate::MeteoraDlmmDecodedEvent::PoolLifecycle(event) => {
+ return self
+ .materialize_named_dex_event(
+ transaction,
+ event.transaction_id,
+ event.instruction_id,
+ "meteora_dlmm",
+ event.program_id.clone(),
+ event.event_kind.as_str(),
+ event.pool_account.clone(),
+ None,
+ event.token_a_mint.clone(),
+ event.token_b_mint.clone(),
+ None,
+ event.payload_json.clone(),
+ )
+ .await;
+ },
}
}
diff --git a/kb_lib/src/dex_event_classification.rs b/kb_lib/src/dex_event_classification.rs
index 20b1927..ed22bdc 100644
--- a/kb_lib/src/dex_event_classification.rs
+++ b/kb_lib/src/dex_event_classification.rs
@@ -290,12 +290,21 @@ pub fn is_dex_liquidity_event_kind(event_kind: &str) -> bool {
if event_kind.contains(".withdraw") {
return true;
}
+ if event_kind.contains(".add_liquidity") {
+ return true;
+ }
+ if event_kind.contains(".remove_liquidity") {
+ return true;
+ }
if event_kind.contains(".increase_liquidity") {
return true;
}
if event_kind.contains(".decrease_liquidity") {
return true;
}
+ if event_kind.contains(".initialize_position") {
+ return true;
+ }
if event_kind.contains(".open_position") {
return true;
}
@@ -335,6 +344,9 @@ pub fn is_dex_liquidity_remove_event_kind(event_kind: &str) -> bool {
/// Returns true for concentrated-liquidity position open events.
pub fn is_dex_position_open_event_kind(event_kind: &str) -> bool {
+ if event_kind.contains(".initialize_position") {
+ return true;
+ }
if event_kind.contains(".open_position") {
return true;
}
@@ -379,6 +391,9 @@ pub fn is_dex_reward_event_kind(event_kind: &str) -> bool {
/// Returns true for pool, pair, launch, mint, burn or migration lifecycle events.
pub fn is_dex_pool_lifecycle_event_kind(event_kind: &str) -> bool {
+ if event_kind.contains(".initialize_bin_array") {
+ return true;
+ }
if is_dex_pool_creation_event_kind(event_kind) {
return true;
}
@@ -452,6 +467,12 @@ pub fn is_dex_migration_event_kind(event_kind: &str) -> bool {
/// Returns true for pool creation or initialization events.
pub fn is_dex_pool_creation_event_kind(event_kind: &str) -> bool {
+ if event_kind.contains(".initialize_position") {
+ return false;
+ }
+ if event_kind.contains(".initialize_bin_array") {
+ return true;
+ }
if event_kind.contains(".initialize") {
return true;
}
@@ -995,4 +1016,44 @@ mod tests {
});
assert!(!super::decoded_payload_has_trade_amount_or_price_payload(&empty_payload_json));
}
+
+ #[test]
+ fn classifies_dlmm_add_remove_liquidity_and_positions_as_non_trade_useful() {
+ assert_eq!(
+ super::classify_dex_event_category_code("meteora_dlmm.add_liquidity"),
+ "liquidity"
+ );
+ assert_eq!(
+ super::classify_dex_event_category_code("meteora_dlmm.remove_liquidity"),
+ "liquidity"
+ );
+ assert_eq!(
+ super::classify_dex_event_lifecycle_kind_code("meteora_dlmm.initialize_position"),
+ "position_open"
+ );
+ assert_eq!(
+ super::classify_dex_event_actionability_code(
+ "meteora_dlmm.add_liquidity",
+ false,
+ false,
+ ),
+ "non_trade_useful"
+ );
+ }
+
+ #[test]
+ fn classifies_dlmm_bin_array_initialization_as_pool_lifecycle() {
+ assert_eq!(
+ super::classify_dex_event_category_code("meteora_dlmm.initialize_bin_array"),
+ "pool_lifecycle"
+ );
+ assert_eq!(
+ super::classify_dex_event_actionability_code(
+ "meteora_dlmm.initialize_bin_array",
+ false,
+ false,
+ ),
+ "non_trade_useful"
+ );
+ }
}
diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs
index 061b15a..7e6a08c 100644
--- a/kb_lib/src/lib.rs
+++ b/kb_lib/src/lib.rs
@@ -59,6 +59,8 @@ mod local_pipeline_diagnostics;
mod local_pipeline_replay;
/// Local pipeline validation helpers for non-regression runs.
mod local_pipeline_validation;
+/// Useful non-trade DEX event materialization service.
+mod non_trade_event_materialization;
/// Pair analytic signal service.
mod pair_analytic_signal;
/// Pair-candle aggregation service.
@@ -403,6 +405,10 @@ pub use db::PoolDto;
pub use db::PoolEntity;
/// Normalized pool kind.
pub use db::PoolKind;
+/// Application-facing normalized pool lifecycle event DTO.
+pub use db::PoolLifecycleEventDto;
+/// Persisted normalized pool lifecycle event row.
+pub use db::PoolLifecycleEventEntity;
/// Application-facing normalized pool listing DTO.
pub use db::PoolListingDto;
/// Persisted normalized pool listing row.
@@ -625,6 +631,12 @@ pub use db::query_pairs_list;
pub use db::query_pairs_update_symbol;
/// Inserts or updates one normalized pair row by pool id.
pub use db::query_pairs_upsert;
+/// Returns one pool lifecycle event by decoded event id.
+pub use db::query_pool_lifecycle_events_get_by_decoded_event_id;
+/// Lists recent pool lifecycle events ordered from newest to oldest.
+pub use db::query_pool_lifecycle_events_list_recent;
+/// Inserts or updates one normalized pool lifecycle event row.
+pub use db::query_pool_lifecycle_events_upsert;
/// Reads one normalized pool listing row by pool id.
pub use db::query_pool_listings_get_by_pool_id;
/// Lists normalized pool listings ordered by detected date then id.
@@ -795,6 +807,10 @@ pub use dex::MeteoraDlmmCreatePoolDecoded;
pub use dex::MeteoraDlmmDecodedEvent;
/// Meteora DLMM decoder.
pub use dex::MeteoraDlmmDecoder;
+/// Decoded Meteora DLMM liquidity lifecycle event.
+pub use dex::MeteoraDlmmLiquidityDecoded;
+/// Decoded Meteora DLMM pool lifecycle event.
+pub use dex::MeteoraDlmmPoolLifecycleDecoded;
/// Decoded Meteora DLMM swap event.
pub use dex::MeteoraDlmmSwapDecoded;
/// Decoded Orca Whirlpools create-pool event.
@@ -1123,3 +1139,8 @@ pub use ws_manager::WsManagedEndpointSnapshot;
pub use ws_manager::WsManager;
/// Snapshot of the whole manager state.
pub use ws_manager::WsManagerSnapshot;
+
+/// Result of non-trade event materialization for one transaction.
+pub use non_trade_event_materialization::NonTradeEventMaterializationResult;
+/// Materializes useful non-trade decoded DEX events.
+pub use non_trade_event_materialization::NonTradeEventMaterializationService;
diff --git a/kb_lib/src/local_pipeline_diagnostics.rs b/kb_lib/src/local_pipeline_diagnostics.rs
index 81da000..dbb429e 100644
--- a/kb_lib/src/local_pipeline_diagnostics.rs
+++ b/kb_lib/src/local_pipeline_diagnostics.rs
@@ -152,6 +152,8 @@ impl LocalPipelineDiagnosticsService {
decoded_non_actionable_trade_event_count: counters
.decoded_non_actionable_trade_event_count,
decoded_unknown_event_count: counters.decoded_unknown_event_count,
+ liquidity_event_count: counters.liquidity_event_count,
+ pool_lifecycle_event_count: counters.pool_lifecycle_event_count,
diagnostics_clean,
blocking_issue_count,
missing_trade_event_count: counters.missing_trade_event_count,
diff --git a/kb_lib/src/local_pipeline_replay.rs b/kb_lib/src/local_pipeline_replay.rs
index cf4b331..1179798 100644
--- a/kb_lib/src/local_pipeline_replay.rs
+++ b/kb_lib/src/local_pipeline_replay.rs
@@ -45,6 +45,8 @@ pub struct LocalPipelineReplayResult {
pub detect_error_count: usize,
/// Number of transactions that produced a trade aggregation error.
pub trade_aggregation_error_count: usize,
+ /// Number of transactions that produced a non-trade materialization error.
+ pub non_trade_materialization_error_count: usize,
/// Number of transactions that produced a candle aggregation error.
pub pair_candle_error_count: usize,
/// Number of transactions that produced an analytic signal error.
@@ -55,6 +57,10 @@ pub struct LocalPipelineReplayResult {
pub detection_count: usize,
/// Total trade aggregation results returned by replayed aggregation calls.
pub trade_event_count: usize,
+ /// Total liquidity event materialization results returned by replayed non-trade calls.
+ pub liquidity_event_count: usize,
+ /// Total pool lifecycle event materialization results returned by replayed non-trade calls.
+ pub pool_lifecycle_event_count: usize,
/// Total candle upsert results returned by replayed candle calls.
///
/// This is a replay write/result counter, not the number of distinct rows
@@ -141,6 +147,8 @@ impl LocalPipelineReplayService {
let dex_decode = crate::DexDecodeService::new(self.database.clone());
let dex_detect = crate::DexDetectService::new(self.database.clone());
let trade_aggregation = crate::TradeAggregationService::new(self.database.clone());
+ let non_trade_materialization =
+ crate::NonTradeEventMaterializationService::new(self.database.clone());
let pair_candle_aggregation =
crate::PairCandleAggregationService::new(self.database.clone());
let pair_analytic_signal = crate::PairAnalyticSignalService::new(self.database.clone());
@@ -187,6 +195,25 @@ impl LocalPipelineReplayService {
);
},
}
+ let non_trade_result = non_trade_materialization
+ .record_transaction_by_signature(signature.as_str())
+ .await;
+ match non_trade_result {
+ Ok(non_trade_result) => {
+ result.liquidity_event_count += non_trade_result.liquidity_event_count;
+ result.pool_lifecycle_event_count +=
+ non_trade_result.pool_lifecycle_event_count;
+ },
+ Err(error) => {
+ result.non_trade_materialization_error_count += 1;
+ tracing::warn!(
+ signature = %signature,
+ error = %error,
+ "local pipeline replay non-trade materialization step failed"
+ );
+ continue;
+ },
+ }
let trade_result =
trade_aggregation.record_transaction_by_signature(signature.as_str()).await;
match trade_result {
diff --git a/kb_lib/src/local_pipeline_validation.rs b/kb_lib/src/local_pipeline_validation.rs
index ac00f9d..7d98498 100644
--- a/kb_lib/src/local_pipeline_validation.rs
+++ b/kb_lib/src/local_pipeline_validation.rs
@@ -208,6 +208,22 @@ impl LocalPipelineValidationConfig {
config.require_pair_trading_readiness_semantics = true;
return config;
}
+
+ /// Builds the `0.7.34` non-trade liquidity/lifecycle validation config.
+ ///
+ /// This profile keeps the `0.7.33` checks and exposes the first materialized
+ /// non-trade tables without allowing them to affect trade/candle validation.
+ ///
+ /// Unlike the full replay profiles, this profile accepts partial databases
+ /// produced by targeted pool backfills. Missing expected DEXes are reported
+ /// as warnings instead of blocking issues.
+ pub fn v0_7_34_non_trade_liquidity_lifecycle() -> Self {
+ let mut config = Self::v0_7_33_pair_trading_readiness();
+ config.profile_code = "0.7.34_non_trade_liquidity_lifecycle".to_string();
+ config.require_all_expected_dexes = false;
+ config.allow_unexpected_dexes = true;
+ return config;
+ }
}
/// A single local pipeline validation issue.
@@ -246,6 +262,10 @@ pub struct LocalPipelineValidationReportDto {
pub decoded_non_actionable_trade_event_count: i64,
/// Total decoded events with unknown classification.
pub decoded_unknown_event_count: i64,
+ /// Total persisted liquidity events.
+ pub liquidity_event_count: i64,
+ /// Total persisted pool lifecycle events.
+ pub pool_lifecycle_event_count: i64,
/// Number of entries currently exposed by the DEX support matrix.
pub dex_support_matrix_entry_count: i64,
/// DEX support matrix snapshot exposed with the validation report.
@@ -368,6 +388,14 @@ impl LocalPipelineValidationService {
let config = crate::LocalPipelineValidationConfig::v0_7_33_pair_trading_readiness();
return self.validate_current_database(&config).await;
}
+
+ /// Diagnoses the current database with the `0.7.34` non-trade liquidity/lifecycle profile.
+ pub async fn validate_v0_7_34_current_database(
+ &self,
+ ) -> Result {
+ let config = crate::LocalPipelineValidationConfig::v0_7_34_non_trade_liquidity_lifecycle();
+ return self.validate_current_database(&config).await;
+ }
}
/// Validates a diagnostics summary without performing database access.
@@ -485,14 +513,16 @@ pub fn validate_local_pipeline_diagnostics_summary(
if config.require_pair_trading_readiness_semantics {
validate_pair_trading_readiness_semantics(&mut issues, summary);
}
- if config.require_all_expected_dexes {
+ let missing_expected_dex_is_warning =
+ config.profile_code == "0.7.34_non_trade_liquidity_lifecycle";
+ if config.require_all_expected_dexes || missing_expected_dex_is_warning {
for expected_dex_code in &expected_dex_codes {
if !observed_dex_codes.contains(expected_dex_code) {
issues.push(LocalPipelineValidationIssueDto {
code: "expected_dex_missing".to_string(),
message: format!("expected DEX '{}' is missing", expected_dex_code),
subject: Some(expected_dex_code.clone()),
- blocking: true,
+ blocking: config.require_all_expected_dexes,
});
}
}
@@ -554,6 +584,8 @@ pub fn validate_local_pipeline_diagnostics_summary(
decoded_non_trade_useful_event_count: summary.decoded_non_trade_useful_event_count,
decoded_non_actionable_trade_event_count: summary.decoded_non_actionable_trade_event_count,
decoded_unknown_event_count: summary.decoded_unknown_event_count,
+ liquidity_event_count: summary.liquidity_event_count,
+ pool_lifecycle_event_count: summary.pool_lifecycle_event_count,
dex_support_matrix_entry_count: crate::dex_support_matrix_entries().len() as i64,
dex_support_matrix: crate::dex_support_matrix_entry_dtos(),
issues,
@@ -762,6 +794,8 @@ mod tests {
decoded_non_trade_useful_event_count: 0,
decoded_non_actionable_trade_event_count: 0,
decoded_unknown_event_count: 0,
+ liquidity_event_count: 0,
+ pool_lifecycle_event_count: 0,
diagnostics_clean: true,
blocking_issue_count: 0,
missing_trade_event_count: 6,
@@ -1061,6 +1095,36 @@ mod tests {
assert_eq!(report.blocking_issue_count, 0);
}
+ #[test]
+ fn validation_accepts_0_7_34_non_trade_liquidity_lifecycle_summary() {
+ let mut summary = make_0_7_28_summary_with_meteora();
+ summary.decoded_non_trade_useful_event_count = 4;
+ summary.liquidity_event_count = 2;
+ summary.pool_lifecycle_event_count = 2;
+ let config = crate::LocalPipelineValidationConfig::v0_7_34_non_trade_liquidity_lifecycle();
+ let report = crate::validate_local_pipeline_diagnostics_summary(&summary, &config);
+ assert!(report.validation_passed);
+ assert_eq!(report.validation_profile_code, "0.7.34_non_trade_liquidity_lifecycle");
+ assert_eq!(report.liquidity_event_count, 2);
+ assert_eq!(report.pool_lifecycle_event_count, 2);
+ }
+
+ #[test]
+ fn validation_accepts_0_7_34_partial_database_without_expected_dex_coverage() {
+ let mut summary = make_0_7_28_summary_with_meteora();
+ summary.dex_summaries.retain(|dex_summary| {
+ return dex_summary.dex_code != "meteora_damm_v1";
+ });
+ let config = crate::LocalPipelineValidationConfig::v0_7_34_non_trade_liquidity_lifecycle();
+ let report = crate::validate_local_pipeline_diagnostics_summary(&summary, &config);
+ assert!(report.validation_passed);
+ assert_eq!(report.blocking_issue_count, 0);
+ assert_eq!(report.warning_count, 1);
+ assert_eq!(report.issues[0].code, "expected_dex_missing");
+ assert_eq!(report.issues[0].subject, Some("meteora_damm_v1".to_string()));
+ assert!(!report.issues[0].blocking);
+ }
+
#[test]
fn validation_rejects_0_7_33_pair_trading_readiness_mismatch() {
let mut summary = make_0_7_28_summary_with_meteora();
diff --git a/kb_lib/src/non_trade_event_materialization.rs b/kb_lib/src/non_trade_event_materialization.rs
new file mode 100644
index 0000000..e59021a
--- /dev/null
+++ b/kb_lib/src/non_trade_event_materialization.rs
@@ -0,0 +1,490 @@
+// file: kb_lib/src/non_trade_event_materialization.rs
+
+//! Materialization of useful non-trade DEX events.
+//!
+//! This service persists liquidity and pool lifecycle events from already
+//! decoded DEX events. It deliberately does not feed trade, metric or candle
+//! materialization.
+
+/// Result of non-trade event materialization for one transaction.
+#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
+pub struct NonTradeEventMaterializationResult {
+ /// Number of liquidity events inserted or refreshed.
+ pub liquidity_event_count: usize,
+ /// Number of pool lifecycle events inserted or refreshed.
+ pub pool_lifecycle_event_count: usize,
+}
+
+/// Materializes useful non-trade decoded DEX events.
+#[derive(Debug, Clone)]
+pub struct NonTradeEventMaterializationService {
+ database: std::sync::Arc,
+}
+
+struct NonTradeDecodedEventContext {
+ dex_id: std::option::Option,
+ pool_id: std::option::Option,
+ pair_id: std::option::Option,
+ pair: std::option::Option,
+}
+
+impl NonTradeEventMaterializationService {
+ /// Creates a new non-trade event materialization service.
+ pub fn new(database: std::sync::Arc) -> Self {
+ return Self { database };
+ }
+
+ /// Materializes useful non-trade events for one persisted transaction signature.
+ pub async fn record_transaction_by_signature(
+ &self,
+ signature: &str,
+ ) -> Result {
+ let transaction_result =
+ crate::query_chain_transactions_get_by_signature(self.database.as_ref(), signature)
+ .await;
+ let transaction_option = match transaction_result {
+ Ok(transaction_option) => transaction_option,
+ Err(error) => return Err(error),
+ };
+ let transaction = match transaction_option {
+ Some(transaction) => transaction,
+ None => {
+ return Err(crate::Error::InvalidState(format!(
+ "cannot materialize non-trade events for unknown transaction '{}'",
+ signature
+ )));
+ },
+ };
+ if transaction.err_json.is_some() {
+ tracing::debug!(
+ signature = %transaction.signature,
+ "skipping non-trade materialization for failed transaction"
+ );
+ return Ok(crate::NonTradeEventMaterializationResult::default());
+ }
+ let transaction_id = match transaction.id {
+ Some(transaction_id) => transaction_id,
+ None => {
+ return Err(crate::Error::InvalidState(format!(
+ "transaction '{}' has no internal id",
+ transaction.signature
+ )));
+ },
+ };
+ let decoded_events_result = crate::query_dex_decoded_events_list_by_transaction_id(
+ self.database.as_ref(),
+ transaction_id,
+ )
+ .await;
+ let decoded_events = match decoded_events_result {
+ Ok(decoded_events) => decoded_events,
+ Err(error) => return Err(error),
+ };
+ let mut result = crate::NonTradeEventMaterializationResult::default();
+ for decoded_event in &decoded_events {
+ let payload_result =
+ serde_json::from_str::(decoded_event.payload_json.as_str());
+ let payload = match payload_result {
+ Ok(payload) => payload,
+ Err(error) => {
+ tracing::warn!(
+ signature = %transaction.signature,
+ event_kind = %decoded_event.event_kind,
+ error = %error,
+ "skipping non-trade materialization for invalid decoded payload"
+ );
+ continue;
+ },
+ };
+ if crate::is_dex_liquidity_event_kind(decoded_event.event_kind.as_str()) {
+ let materialized = self
+ .materialize_liquidity_event(
+ &transaction,
+ transaction_id,
+ decoded_event,
+ &payload,
+ )
+ .await;
+ match materialized {
+ Ok(was_materialized) => {
+ if was_materialized {
+ result.liquidity_event_count += 1;
+ }
+ },
+ Err(error) => return Err(error),
+ }
+ }
+ if crate::is_dex_pool_lifecycle_event_kind(decoded_event.event_kind.as_str()) {
+ let materialized = self
+ .materialize_pool_lifecycle_event(&transaction, transaction_id, decoded_event)
+ .await;
+ match materialized {
+ Ok(was_materialized) => {
+ if was_materialized {
+ result.pool_lifecycle_event_count += 1;
+ }
+ },
+ Err(error) => return Err(error),
+ }
+ }
+ }
+ return Ok(result);
+ }
+
+ async fn materialize_pool_lifecycle_event(
+ &self,
+ transaction: &crate::ChainTransactionDto,
+ transaction_id: i64,
+ decoded_event: &crate::DexDecodedEventDto,
+ ) -> Result {
+ let decoded_event_id = match decoded_event.id {
+ Some(decoded_event_id) => decoded_event_id,
+ None => return Ok(false),
+ };
+ let context = self.resolve_decoded_event_context(decoded_event).await;
+ let context = match context {
+ Ok(context) => context,
+ Err(error) => return Err(error),
+ };
+ let dto = crate::PoolLifecycleEventDto::new(
+ transaction_id,
+ Some(decoded_event_id),
+ context.dex_id,
+ context.pool_id,
+ context.pair_id,
+ transaction.signature.clone(),
+ transaction.slot,
+ decoded_event.protocol_name.clone(),
+ decoded_event.program_id.clone(),
+ decoded_event.event_kind.clone(),
+ decoded_event.pool_account.clone(),
+ decoded_event.token_a_mint.clone(),
+ decoded_event.token_b_mint.clone(),
+ decoded_event.payload_json.clone(),
+ );
+ let upsert_result =
+ crate::query_pool_lifecycle_events_upsert(self.database.as_ref(), &dto).await;
+ match upsert_result {
+ Ok(_) => return Ok(true),
+ Err(error) => return Err(error),
+ }
+ }
+
+ async fn materialize_liquidity_event(
+ &self,
+ transaction: &crate::ChainTransactionDto,
+ transaction_id: i64,
+ decoded_event: &crate::DexDecodedEventDto,
+ payload: &serde_json::Value,
+ ) -> Result {
+ let decoded_event_id = match decoded_event.id {
+ Some(decoded_event_id) => decoded_event_id,
+ None => return Ok(false),
+ };
+ let context = self.resolve_decoded_event_context(decoded_event).await;
+ let context = match context {
+ Ok(context) => context,
+ Err(error) => return Err(error),
+ };
+ let dex_id = match context.dex_id {
+ Some(dex_id) => dex_id,
+ None => return Ok(false),
+ };
+ let pool_id = match context.pool_id {
+ Some(pool_id) => pool_id,
+ None => return Ok(false),
+ };
+ let pair = match context.pair {
+ Some(pair) => pair,
+ None => return Ok(false),
+ };
+ let pair_id = match pair.id {
+ Some(pair_id) => Some(pair_id),
+ None => None,
+ };
+ let event_kind = if crate::is_dex_position_open_event_kind(decoded_event.event_kind.as_str()) {
+ crate::LiquidityEventKind::PositionOpen
+ } else if crate::is_dex_position_close_event_kind(decoded_event.event_kind.as_str()) {
+ crate::LiquidityEventKind::PositionClose
+ } else if crate::is_dex_liquidity_remove_event_kind(decoded_event.event_kind.as_str()) {
+ crate::LiquidityEventKind::Remove
+ } else {
+ crate::LiquidityEventKind::Add
+ };
+ let actor_wallet = extract_first_string(
+ payload,
+ &[
+ "actorWallet",
+ "actor_wallet",
+ "user",
+ "owner",
+ "payer",
+ "authority",
+ "liquidityProvider",
+ "liquidity_provider",
+ ],
+ );
+ let base_amount = extract_first_amount_string(
+ payload,
+ &[
+ "baseAmountRaw",
+ "base_amount_raw",
+ "baseAmount",
+ "base_amount",
+ "amountBase",
+ "amount_base",
+ "tokenAAmount",
+ "token_a_amount",
+ "amountA",
+ "amount_a",
+ ],
+ );
+ let quote_amount = extract_first_amount_string(
+ payload,
+ &[
+ "quoteAmountRaw",
+ "quote_amount_raw",
+ "quoteAmount",
+ "quote_amount",
+ "amountQuote",
+ "amount_quote",
+ "tokenBAmount",
+ "token_b_amount",
+ "amountB",
+ "amount_b",
+ ],
+ );
+ let lp_amount = extract_first_amount_string(
+ payload,
+ &[
+ "lpAmountRaw",
+ "lp_amount_raw",
+ "lpAmount",
+ "lp_amount",
+ "liquidity",
+ "liquidityAmount",
+ "liquidity_amount",
+ ],
+ );
+ let amounts_are_complete = base_amount.is_some() && quote_amount.is_some();
+ let base_amount_value = match base_amount {
+ Some(base_amount_value) => base_amount_value,
+ None => "0".to_string(),
+ };
+ let quote_amount_value = match quote_amount {
+ Some(quote_amount_value) => quote_amount_value,
+ None => "0".to_string(),
+ };
+ let dto = crate::LiquidityEventDto::new(
+ dex_id,
+ pool_id,
+ pair_id,
+ transaction.signature.clone(),
+ decoded_event_id,
+ transaction.slot,
+ event_kind,
+ actor_wallet,
+ pair.base_token_id,
+ pair.quote_token_id,
+ None,
+ base_amount_value,
+ quote_amount_value,
+ lp_amount,
+ )
+ .with_decoded_event_metadata(
+ Some(transaction_id),
+ Some(decoded_event_id),
+ Some(decoded_event.program_id.clone()),
+ Some(decoded_event.event_kind.clone()),
+ Some(decoded_event.payload_json.clone()),
+ amounts_are_complete,
+ );
+ let upsert_result =
+ crate::query_liquidity_events_upsert(self.database.as_ref(), &dto).await;
+ match upsert_result {
+ Ok(_) => return Ok(true),
+ Err(error) => return Err(error),
+ }
+ }
+
+ async fn resolve_decoded_event_context(
+ &self,
+ decoded_event: &crate::DexDecodedEventDto,
+ ) -> Result {
+ let dex_result = crate::query_dexs_get_by_code(
+ self.database.as_ref(),
+ decoded_event.protocol_name.as_str(),
+ )
+ .await;
+ let dex_id = match dex_result {
+ Ok(Some(dex)) => dex.id,
+ Ok(None) => None,
+ Err(error) => return Err(error),
+ };
+ let pool_address = match decoded_event.pool_account.clone() {
+ Some(pool_address) => pool_address,
+ None => {
+ return Ok(NonTradeDecodedEventContext {
+ dex_id,
+ pool_id: None,
+ pair_id: None,
+ pair: None,
+ });
+ },
+ };
+ let pool_result =
+ crate::query_pools_get_by_address(self.database.as_ref(), pool_address.as_str()).await;
+ let pool = match pool_result {
+ Ok(Some(pool)) => pool,
+ Ok(None) => {
+ return Ok(NonTradeDecodedEventContext {
+ dex_id,
+ pool_id: None,
+ pair_id: None,
+ pair: None,
+ });
+ },
+ Err(error) => return Err(error),
+ };
+ let pool_id = match pool.id {
+ Some(pool_id) => pool_id,
+ None => {
+ return Ok(NonTradeDecodedEventContext {
+ dex_id,
+ pool_id: None,
+ pair_id: None,
+ pair: None,
+ });
+ },
+ };
+ let pair_result = crate::query_pairs_get_by_pool_id(self.database.as_ref(), pool_id).await;
+ let pair = match pair_result {
+ Ok(pair) => pair,
+ Err(error) => return Err(error),
+ };
+ let pair_id = match pair.as_ref() {
+ Some(pair) => pair.id,
+ None => None,
+ };
+ return Ok(NonTradeDecodedEventContext {
+ dex_id,
+ pool_id: Some(pool_id),
+ pair_id,
+ pair,
+ });
+ }
+}
+
+fn extract_first_amount_string(
+ value: &serde_json::Value,
+ candidate_keys: &[&str],
+) -> std::option::Option {
+ let text = extract_first_string(value, candidate_keys);
+ if text.is_some() {
+ return text;
+ }
+ return extract_first_number_as_string(value, candidate_keys);
+}
+
+fn extract_first_string(
+ value: &serde_json::Value,
+ candidate_keys: &[&str],
+) -> std::option::Option {
+ if let Some(object) = value.as_object() {
+ for candidate_key in candidate_keys {
+ let value_option = object.get(*candidate_key);
+ let candidate = match value_option {
+ Some(candidate) => candidate,
+ None => continue,
+ };
+ if let Some(text) = candidate.as_str() {
+ let trimmed = text.trim();
+ if !trimmed.is_empty() {
+ return Some(trimmed.to_string());
+ }
+ }
+ }
+ for nested_value in object.values() {
+ let nested = extract_first_string(nested_value, candidate_keys);
+ if nested.is_some() {
+ return nested;
+ }
+ }
+ return None;
+ }
+ if let Some(array) = value.as_array() {
+ for nested_value in array {
+ let nested = extract_first_string(nested_value, candidate_keys);
+ if nested.is_some() {
+ return nested;
+ }
+ }
+ }
+ return None;
+}
+
+fn extract_first_number_as_string(
+ value: &serde_json::Value,
+ candidate_keys: &[&str],
+) -> std::option::Option {
+ if let Some(object) = value.as_object() {
+ for candidate_key in candidate_keys {
+ let value_option = object.get(*candidate_key);
+ let candidate = match value_option {
+ Some(candidate) => candidate,
+ None => continue,
+ };
+ if let Some(number) = candidate.as_i64() {
+ return Some(number.to_string());
+ }
+ if let Some(number) = candidate.as_u64() {
+ return Some(number.to_string());
+ }
+ if let Some(number) = candidate.as_f64() {
+ return Some(number.to_string());
+ }
+ }
+ for nested_value in object.values() {
+ let nested = extract_first_number_as_string(nested_value, candidate_keys);
+ if nested.is_some() {
+ return nested;
+ }
+ }
+ return None;
+ }
+ if let Some(array) = value.as_array() {
+ for nested_value in array {
+ let nested = extract_first_number_as_string(nested_value, candidate_keys);
+ if nested.is_some() {
+ return nested;
+ }
+ }
+ }
+ return None;
+}
+
+#[cfg(test)]
+mod tests {
+ #[test]
+ fn extracts_nested_liquidity_amounts() {
+ let payload = serde_json::json!({
+ "event": {
+ "baseAmountRaw": "100",
+ "quoteAmountRaw": 25,
+ "owner": "Owner111111111111111111111111111111111111"
+ }
+ });
+ assert_eq!(
+ super::extract_first_amount_string(&payload, &["baseAmountRaw"]),
+ Some("100".to_string())
+ );
+ assert_eq!(
+ super::extract_first_amount_string(&payload, &["quoteAmountRaw"]),
+ Some("25".to_string())
+ );
+ assert_eq!(
+ super::extract_first_string(&payload, &["owner"]),
+ Some("Owner111111111111111111111111111111111111".to_string())
+ );
+ }
+}
diff --git a/kb_lib/src/token_backfill.rs b/kb_lib/src/token_backfill.rs
index a0faea6..8c73f51 100644
--- a/kb_lib/src/token_backfill.rs
+++ b/kb_lib/src/token_backfill.rs
@@ -31,6 +31,10 @@ pub struct TokenBackfillResult {
pub wallet_participation_count: usize,
/// Total number of trade-aggregation results produced during this run.
pub trade_event_count: usize,
+ /// Total number of liquidity event materialization results produced during this run.
+ pub liquidity_event_count: usize,
+ /// Total number of pool lifecycle event materialization results produced during this run.
+ pub pool_lifecycle_event_count: usize,
/// Total number of pair-candle aggregation results produced during this run.
pub pair_candle_count: usize,
}
@@ -60,6 +64,10 @@ pub struct PoolBackfillResult {
pub wallet_participation_count: usize,
/// Total number of trade-aggregation results produced during this run.
pub trade_event_count: usize,
+ /// Total number of liquidity event materialization results produced during this run.
+ pub liquidity_event_count: usize,
+ /// Total number of pool lifecycle event materialization results produced during this run.
+ pub pool_lifecycle_event_count: usize,
/// Total number of pair-candle aggregation results produced during this run.
pub pair_candle_count: usize,
}
@@ -80,6 +88,7 @@ pub struct TokenBackfillService {
launch_origin_service: crate::LaunchOriginService,
pool_origin_service: crate::PoolOriginService,
wallet_observation_service: crate::WalletObservationService,
+ non_trade_materialization_service: crate::NonTradeEventMaterializationService,
trade_aggregation_service: crate::TradeAggregationService,
pair_candle_aggregation_service: crate::PairCandleAggregationService,
transaction_classification_service: crate::TransactionClassificationService,
@@ -100,6 +109,8 @@ impl TokenBackfillService {
let launch_origin_service = crate::LaunchOriginService::new(database.clone());
let pool_origin_service = crate::PoolOriginService::new(database.clone());
let wallet_observation_service = crate::WalletObservationService::new(database.clone());
+ let non_trade_materialization_service =
+ crate::NonTradeEventMaterializationService::new(database.clone());
let trade_aggregation_service = crate::TradeAggregationService::new(database.clone());
let pair_candle_aggregation_service =
crate::PairCandleAggregationService::new(database.clone());
@@ -121,6 +132,7 @@ impl TokenBackfillService {
launch_origin_service,
pool_origin_service,
wallet_observation_service,
+ non_trade_materialization_service,
trade_aggregation_service,
pair_candle_aggregation_service,
transaction_classification_service,
@@ -149,6 +161,8 @@ impl TokenBackfillService {
pool_origin_count: 0,
wallet_participation_count: 0,
trade_event_count: 0,
+ liquidity_event_count: 0,
+ pool_lifecycle_event_count: 0,
pair_candle_count: 0,
};
let mut seen_signatures = std::collections::HashSet::::new();
@@ -221,6 +235,8 @@ impl TokenBackfillService {
"poolOriginCount": result.pool_origin_count,
"walletParticipationCount": result.wallet_participation_count,
"tradeEventCount": result.trade_event_count,
+ "liquidityEventCount": result.liquidity_event_count,
+ "poolLifecycleEventCount": result.pool_lifecycle_event_count,
"pairCandleCount": result.pair_candle_count
});
let observation_result = self
@@ -359,6 +375,8 @@ impl TokenBackfillService {
pool_origin_count: 0,
wallet_participation_count: 0,
trade_event_count: 0,
+ liquidity_event_count: 0,
+ pool_lifecycle_event_count: 0,
pair_candle_count: 0,
});
}
@@ -424,6 +442,14 @@ impl TokenBackfillService {
Ok(wallet_observations) => wallet_observations,
Err(error) => return Err(error),
};
+ let non_trade_materialization_result = self
+ .non_trade_materialization_service
+ .record_transaction_by_signature(signature.as_str())
+ .await;
+ let non_trade_materialization = match non_trade_materialization_result {
+ Ok(non_trade_materialization) => non_trade_materialization,
+ Err(error) => return Err(error),
+ };
let trade_aggregations_result = self
.trade_aggregation_service
.record_transaction_by_signature(signature.as_str())
@@ -456,6 +482,8 @@ impl TokenBackfillService {
pool_origin_count: pool_origins.len(),
wallet_participation_count: wallet_observations.len(),
trade_event_count: trade_aggregations.len(),
+ liquidity_event_count: non_trade_materialization.liquidity_event_count,
+ pool_lifecycle_event_count: non_trade_materialization.pool_lifecycle_event_count,
pair_candle_count: pair_candle_aggregations.len(),
});
}
@@ -479,6 +507,8 @@ impl TokenBackfillService {
pool_origin_count: 0,
wallet_participation_count: 0,
trade_event_count: 0,
+ liquidity_event_count: 0,
+ pool_lifecycle_event_count: 0,
pair_candle_count: 0,
};
let mut seen_addresses = std::collections::BTreeSet::::new();
@@ -562,6 +592,8 @@ impl TokenBackfillService {
result.pool_origin_count += replay_result.pool_origin_count;
result.wallet_participation_count += replay_result.wallet_participation_count;
result.trade_event_count += replay_result.trade_event_count;
+ result.liquidity_event_count += replay_result.liquidity_event_count;
+ result.pool_lifecycle_event_count += replay_result.pool_lifecycle_event_count;
result.pair_candle_count += replay_result.pair_candle_count;
}
}
@@ -578,6 +610,8 @@ impl TokenBackfillService {
"poolOriginCount": result.pool_origin_count,
"walletParticipationCount": result.wallet_participation_count,
"tradeEventCount": result.trade_event_count,
+ "liquidityEventCount": result.liquidity_event_count,
+ "poolLifecycleEventCount": result.pool_lifecycle_event_count,
"pairCandleCount": result.pair_candle_count,
"scannedAddressCount": addresses_to_scan.len(),
"effectiveSignatureLimit": effective_limit
@@ -651,6 +685,8 @@ struct TokenBackfillSignatureResult {
pool_origin_count: usize,
wallet_participation_count: usize,
trade_event_count: usize,
+ liquidity_event_count: usize,
+ pool_lifecycle_event_count: usize,
pair_candle_count: usize,
}
@@ -666,6 +702,8 @@ fn merge_token_backfill_signature_result(
aggregate.pool_origin_count += value.pool_origin_count;
aggregate.wallet_participation_count += value.wallet_participation_count;
aggregate.trade_event_count += value.trade_event_count;
+ aggregate.liquidity_event_count += value.liquidity_event_count;
+ aggregate.pool_lifecycle_event_count += value.pool_lifecycle_event_count;
aggregate.pair_candle_count += value.pair_candle_count;
}
diff --git a/kb_lib/src/tx_resolution.rs b/kb_lib/src/tx_resolution.rs
index 76b7ddc..1b2c555 100644
--- a/kb_lib/src/tx_resolution.rs
+++ b/kb_lib/src/tx_resolution.rs
@@ -105,6 +105,7 @@ pub struct TransactionResolutionService {
launch_origin_service: crate::LaunchOriginService,
pool_origin_service: crate::PoolOriginService,
wallet_observation_service: crate::WalletObservationService,
+ non_trade_materialization_service: crate::NonTradeEventMaterializationService,
trade_aggregation_service: crate::TradeAggregationService,
wallet_holding_observation_service: crate::WalletHoldingObservationService,
pair_candle_aggregation_service: crate::PairCandleAggregationService,
@@ -128,6 +129,8 @@ impl TransactionResolutionService {
let launch_origin_service = crate::LaunchOriginService::new(database.clone());
let pool_origin_service = crate::PoolOriginService::new(database.clone());
let wallet_observation_service = crate::WalletObservationService::new(database.clone());
+ let non_trade_materialization_service =
+ crate::NonTradeEventMaterializationService::new(database.clone());
let trade_aggregation_service = crate::TradeAggregationService::new(database.clone());
let wallet_holding_observation_service =
crate::WalletHoldingObservationService::new(database.clone());
@@ -146,6 +149,7 @@ impl TransactionResolutionService {
launch_origin_service,
pool_origin_service,
wallet_observation_service,
+ non_trade_materialization_service,
trade_aggregation_service,
wallet_holding_observation_service,
pair_candle_aggregation_service,
@@ -377,6 +381,16 @@ impl TransactionResolutionService {
Err(error) => return Err(error),
};
let wallet_holding_count = wallet_holding_observations.len();
+ let non_trade_materialization_result = self
+ .non_trade_materialization_service
+ .record_transaction_by_signature(request.signature.as_str())
+ .await;
+ let non_trade_materialization = match non_trade_materialization_result {
+ Ok(non_trade_materialization) => non_trade_materialization,
+ Err(error) => return Err(error),
+ };
+ let liquidity_event_count = non_trade_materialization.liquidity_event_count;
+ let pool_lifecycle_event_count = non_trade_materialization.pool_lifecycle_event_count;
let trade_aggregations_result = self
.trade_aggregation_service
.record_transaction_by_signature(request.signature.as_str())
@@ -429,6 +443,8 @@ impl TransactionResolutionService {
"poolOriginCount": pool_origin_count,
"walletParticipationCount": wallet_participation_count,
"walletHoldingCount": wallet_holding_count,
+ "liquidityEventCount": liquidity_event_count,
+ "poolLifecycleEventCount": pool_lifecycle_event_count,
"tradeEventCount": trade_event_count,
"pairCandleCount": pair_candle_count,
"pairAnalyticSignalCount": pair_analytic_signal_count,