From c542aa9d32386df97d3214399a37685da8dcb929 Mon Sep 17 00:00:00 2001 From: SinuS Von SifriduS Date: Fri, 1 May 2026 00:29:32 +0200 Subject: [PATCH] 0.7.22 --- .gitignore | 1 + CHANGELOG.md | 1 + Cargo.toml | 2 +- ROADMAP.md | 95 +- kb_app/capabilities/default.json | 3 +- kb_app/frontend/demo_pipeline.html | 255 +++++ kb_app/frontend/main.html | 6 + kb_app/frontend/ts/demo_pipeline.ts | 420 ++++++++ kb_app/frontend/ts/main.ts | 33 +- kb_app/gen/schemas/capabilities.json | 2 +- kb_app/package.json | 4 +- kb_app/src/demo_pipeline.rs | 1494 ++++++++++++++++++++++++++ kb_app/src/lib.rs | 28 +- kb_app/src/main.rs | 18 +- kb_app/tauri.conf.json | 16 +- kb_lib/src/config.rs | 9 +- kb_lib/src/db/sqlite.rs | 9 +- 17 files changed, 2347 insertions(+), 49 deletions(-) create mode 100644 kb_app/frontend/demo_pipeline.html create mode 100644 kb_app/frontend/ts/demo_pipeline.ts create mode 100644 kb_app/src/demo_pipeline.rs diff --git a/.gitignore b/.gitignore index 3f251b9..dafeef1 100644 --- a/.gitignore +++ b/.gitignore @@ -38,6 +38,7 @@ config.json # sqlite +data dbdata *.db *.db-shm diff --git a/CHANGELOG.md b/CHANGELOG.md index e5e3689..b4eb98b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,3 +52,4 @@ 0.7.19 - Ajout d’une première couche holdings observés avec agrégation par couple wallet/token et branchement automatique dans le pipeline de résolution transactionnelle 0.7.20 - Ajout d’une première couche candles / OHLCV avec matérialisation en base des timeframes usuels et régénération à la demande pour un timeframe arbitraire depuis les trade events 0.7.21 - Ajout d’une première couche de signaux analytiques enrichis par paire avec persistance dédiée et détection de first trade, trade burst, buy/sell imbalance, price jump et volume spike +0.7.22 - Ajout d’une première fenêtre `Demo Pipeline` dans `kb_app` pour l’inspection en lecture seule du pipeline `0.7.x`, avec recherche par signature, token mint, pair id ou pool address, affichage structuré des transactions résolues, événements DEX décodés, pools, paires, listings, launch origins, pool origins, wallets et holdings observés, trade events, pair metrics, candles et signaux analytiques déjà persistés, ainsi que conservation d’une instance partagée de la base SQLite pour éviter la réouverture et la réinitialisation du schéma à chaque commande UI diff --git a/Cargo.toml b/Cargo.toml index 6092a30..43ae483 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.7.21" +version = "0.7.22" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index 358817a..22430a7 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -622,7 +622,7 @@ Réalisé : - ajout d’une collecte de cibles `accountSubscribe` à partir des pools actifs connus, - ajout d’une couche d’observations techniques WS hybrides pour `logs / program / account`, - ajout d’une première déduplication en mémoire des notifications techniques reçues en parallèle, -- ajout d’une façade runtime pour exposer ce comportement au futur branchement `ws_manager`. +- ajout d’une façade runtime pour exposer ce comportement au branchement `ws_manager`. ### 6.050. Version `0.7.18` — Backfill historique ciblé par token Réalisé : @@ -663,17 +663,73 @@ Réalisé : - branchement automatique dans le pipeline de résolution transactionnelle. ### 6.054. Version `0.7.22` — `kb_app` : inspection et tests du pipeline `0.7.x` -Objectif : permettre depuis l’application desktop de tester, inspecter et valider tout le pipeline `0.7.x` sans recourir uniquement aux logs bruts ou à SQLite. +Réalisé : + +- ajout d’une fenêtre dédiée `Demo Pipeline` dans `kb_app`, +- inspection du pipeline persistant par `signature`, +- inspection du pipeline persistant par `token mint`, +- inspection du pipeline persistant par `pair id`, +- inspection du pipeline persistant par `pool address`, +- affichage structuré des transactions résolues, événements DEX décodés, pools, paires, listings, launch origins, pool origins, wallets observés, holdings observés, trade events, pair metrics, candles et signaux analytiques, +- possibilité d’utiliser un timeframe custom pour régénérer à la demande les candles non matérialisées, +- conservation d’une instance partagée de `KbDatabase` dans `kb_app` afin d’éviter la réouverture de la base et la réinitialisation du schéma à chaque commande UI, +- validation pratique de l’inspection du pipeline `0.7.x` sans dépendre uniquement des logs bruts ou de la consultation manuelle de SQLite. + +### 6.055. Version `0.7.23` — `kb_app` : backfill token et inspection ciblée +Objectif : piloter le backfill historique depuis l’interface desktop et afficher le résultat de façon exploitable. À faire : -- ajouter une ou plusieurs vues `kb_app` dédiées à l’inspection des transactions résolues, événements DEX décodés, pools, paires, launch origins, pool origins, wallets observés, holdings observés et trade events, -- permettre la recherche par signature, pool, paire, token mint ou wallet, -- afficher les liens entre objets techniques et objets métier, -- permettre de lancer manuellement certains backfills ou inspections ciblées, -- fournir un socle UI pour tester en pratique tout ce qui a été construit dans la série `0.7.x`. +- ajouter une vue de saisie d’un `token_mint`, +- permettre le déclenchement manuel du `KbTokenBackfillService`, +- afficher le résumé du backfill : signatures mint, pools retrouvés, signatures de pools, transactions résolues, decoded events, trade events, candles et analytic signals, +- permettre une navigation simple entre token, pools, paires et événements liés, +- préparer la réexécution ciblée de backfills sans casser l’idempotence du modèle. -### 6.055. Version `0.7.x` — Couverture DEX v1 +### 6.056. Version `0.7.24` — `kb_app` : visualisation candles / OHLCV +Objectif : fournir une vue graphique exploitable des candles via `echarts`. + +À faire : + +- ajouter une vue de sélection de paire, +- permettre le choix du timeframe, +- lire les candles matérialisées pour les timeframes usuels, +- permettre la régénération à la demande pour un timeframe arbitraire, +- afficher les chandeliers, les volumes et la navigation temporelle, +- préparer l’affichage d’overlays analytiques. + +### 6.057. Version `0.7.25` — `kb_app` : overlays analytiques +Objectif : rendre visibles les signaux analytiques directement sur les graphes et vues de marché. + +À faire : + +- afficher les signaux analytiques par bucket au-dessus ou autour des candles, +- ajouter des marqueurs pour `first_trade_seen`, `trade_burst_60s`, `buy_sell_imbalance_60s`, `price_jump_up_60s`, `price_jump_down_60s` et `volume_spike_60s`, +- permettre le filtrage par type de signal et par sévérité, +- afficher un panneau latéral listant les signaux liés à une paire et à un timeframe. + +### 6.058. Version `0.7.26` — `kb_app` : vues consolidées token / pair / pool +Objectif : fournir une lecture métier plus confortable du modèle `0.7.x`. + +À faire : + +- ajouter une fiche token, +- ajouter une fiche paire, +- ajouter une fiche pool, +- relier dans l’UI les launch origins, pool origins, wallets observés, holdings observés, candles et analytic signals, +- préparer une navigation transversale entre objets techniques et objets métier. + +### 6.059. Version `0.7.27` — Finition UI `0.7.x` +Objectif : stabiliser la couche desktop de validation avant l’ouverture de `0.8.x`. + +À faire : + +- consolider les vues ajoutées dans `kb_app`, +- améliorer la navigation, les filtres et la pagination, +- ajouter les derniers raffinements de confort et de lisibilité, +- préparer une base UI suffisamment stable pour la future phase d’analyse et filtrage `0.8.x`. + +### 6.060. Version `0.7.x` — Couverture DEX v1 Objectif : structurer les connecteurs DEX autour d’un pipeline complet de résolution, décodage et normalisation métier. Protocoles cibles : @@ -700,7 +756,7 @@ Résultat attendu : - préparation d’une détection temps réel hybride et d’un backfill ciblé compatible avec les mêmes objets métier, - préparation d’agrégats DEX plus riches, de candles / OHLCV et d’une UI d’inspection du pipeline `0.7.x`. -### 6.056. Version `0.8.x` — Analyse et filtrage +### 6.061. Version `0.8.x` — Analyse et filtrage Objectif : transformer les événements bruts en signaux exploitables. À faire : @@ -712,7 +768,7 @@ Objectif : transformer les événements bruts en signaux exploitables. - premiers patterns, - enrichissement des signaux analytiques préparés en fin de `0.7.x`. -### 6.057. Version `1.x.y` — Wallets et swap préparatoire +### 6.062. Version `1.x.y` — Wallets et swap préparatoire Objectif : préparer la couche d’action. À faire : @@ -723,7 +779,7 @@ Objectif : préparer la couche d’action. - préparation d’ordres et de swaps, - simulation et garde-fous. -### 6.058. Version `2.x.y` — Trading semi-automatisé +### 6.063. Version `2.x.y` — Trading semi-automatisé Objectif : brancher l’analyse à l’action tout en gardant des garde-fous explicites. À faire : @@ -734,7 +790,7 @@ Objectif : brancher l’analyse à l’action tout en gardant des garde-fous exp - confirmations explicites ou semi-automatiques, - journaux d’exécution. -### 6.059. Version `3.x.y` — Yellowstone gRPC +### 6.064. Version `3.x.y` — Yellowstone gRPC Objectif : ajouter le connecteur gRPC dédié. À faire : @@ -822,11 +878,10 @@ Le projet doit maintenir au minimum : La priorité immédiate est désormais la suivante : -1. finaliser complètement la fin de série `0.7.x` avant l’ouverture de `0.8.x`, -2. ajouter un renforcement temps réel hybride avec `logsSubscribe`, `programSubscribe` et `accountSubscribe` en parallèle des sources déjà exploitées, -3. conserver la résolution transactionnelle comme source de normalisation commune, -4. ajouter ensuite un mode de backfill historique ciblé par `token_mint` pour des tokens encore actifs donnés explicitement, -5. compléter la couche métier avec des `holdings observés`, -6. ajouter des `candles / OHLCV` et une première couche de signaux analytiques plus riches, -7. doter `kb_app` d’une vraie UI d’inspection et de test pour l’ensemble du pipeline `0.7.x`, -8. préparer enfin l’arrivée de Yellowstone gRPC comme extension de capacité, et non comme remplacement du socle existant. +1. poursuivre la fin de série `0.7.x` côté `kb_app` avant l’ouverture de `0.8.x`, +2. ajouter un pilotage UI du backfill historique ciblé par `token_mint`, +3. ajouter une vue graphique des candles / OHLCV avec `echarts`, +4. ajouter les overlays des signaux analytiques sur les candles, +5. consolider les vues métier `token / pair / pool` dans `kb_app`, +6. stabiliser l’ergonomie, les filtres et la navigation de l’UI d’inspection, +7. préparer enfin l’arrivée de Yellowstone gRPC comme extension de capacité, et non comme remplacement du socle existant. diff --git a/kb_app/capabilities/default.json b/kb_app/capabilities/default.json index 2d822f0..2e2236c 100644 --- a/kb_app/capabilities/default.json +++ b/kb_app/capabilities/default.json @@ -7,7 +7,8 @@ "splash", "demo_ws", "demo_http", - "demo_ws_manager" + "demo_ws_manager", + "demo_pipeline" ], "permissions": [ "core:default", diff --git a/kb_app/frontend/demo_pipeline.html b/kb_app/frontend/demo_pipeline.html new file mode 100644 index 0000000..4f26d27 --- /dev/null +++ b/kb_app/frontend/demo_pipeline.html @@ -0,0 +1,255 @@ + + + + + + + + Khadhroony-BoBoBot — Demo Pipeline + + + + +
+ +
+ +
+
+
+
+
+
+
+

Inspection par signature

+

+ Cette fenêtre inspecte la donnée déjà persistée dans kb_lib pour une + signature donnée et affiche l’état du pipeline 0.7.x. +

+ +
+ + +
+ +
+ + +
+ Les timeframes matérialisés restent chargés depuis la base. Une valeur custom déclenche + une régénération à la demande. +
+
+ +
+ + +
+ +
+ +
+ + +
+ +
+ +
+ +
+ +
+ + +
+ +
+ +
+ +
+ + +
+ +
+ +
+ +
+ +
+
But : vérifier rapidement la cohérence du pipeline 0.7.x.
+
Portée : lecture seule depuis SQLite via kb_lib.
+
+
+
+
+ +
+
+
+

Résumé

+ +
+
+ +
+
+

+ +

+
+
+ +
+
+
+ +
+

+ +

+
+
+ +
+
+
+ +
+

+ +

+
+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+
+
+ +
+

+ +

+
+
+ +
+
+
+ +
+

+ +

+
+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+
+
+
+ +
+
+
+

Log UI

+ +
+ +
+
+
+
+
+
+
+ + + + + + + \ No newline at end of file diff --git a/kb_app/frontend/main.html b/kb_app/frontend/main.html index 48cffa0..38ed8db 100644 --- a/kb_app/frontend/main.html +++ b/kb_app/frontend/main.html @@ -28,6 +28,9 @@ + @@ -72,6 +75,9 @@ +
diff --git a/kb_app/frontend/ts/demo_pipeline.ts b/kb_app/frontend/ts/demo_pipeline.ts new file mode 100644 index 0000000..065762f --- /dev/null +++ b/kb_app/frontend/ts/demo_pipeline.ts @@ -0,0 +1,420 @@ +// file: kb_app/frontend/ts/demo_pipeline.ts + +import * as bootstrap from "bootstrap"; +import "simplebar"; +import ResizeObserver from "resize-observer-polyfill"; +import { invoke } from "@tauri-apps/api/core"; +import { debug, takeoverConsole } from "@fltsci/tauri-plugin-tracing"; + +(window as Window & typeof globalThis & { bootstrap?: typeof bootstrap }).bootstrap = bootstrap; +(window as Window & typeof globalThis & { ResizeObserver?: typeof ResizeObserver }).ResizeObserver = ResizeObserver; + +interface DemoPipelineInspectRequest { + signature: string; + customTimeframeSeconds: number | null; +} + +interface DemoPipelineInspectPayload { + signature: string; + summaryJson: string; + transactionJson: string; + decodedEventsJson: string; + poolsJson: string; + pairsJson: string; + launchAttributionsJson: string; + poolOriginsJson: string; + walletsJson: string; + tradeEventsJson: string; + pairMetricsJson: string; + pairCandlesJson: string; + pairAnalyticSignalsJson: string; +} + +interface DemoPipelineInspectTokenRequest { + tokenMint: string; + customTimeframeSeconds: number | null; +} + +interface DemoPipelineInspectPairRequest { + pairId: number; + customTimeframeSeconds: number | null; +} + +interface DemoPipelineInspectPoolRequest { + poolAddress: string; + customTimeframeSeconds: number | null; +} + +function appendLogLine(textarea: HTMLTextAreaElement, line: string): void { + const now = new Date(); + const timestamp = now.toLocaleTimeString("fr-CH", { hour12: false }); + + const lines = textarea.value === "" ? [] : textarea.value.split("\n"); + lines.push(`[${timestamp}] ${line}`); + + const maxLines = 300; + textarea.value = lines.slice(-maxLines).join("\n"); + textarea.scrollTop = textarea.scrollHeight; +} + +function clearInspection( + summaryTextarea: HTMLTextAreaElement, + transactionTextarea: HTMLTextAreaElement, + decodedEventsTextarea: HTMLTextAreaElement, + poolsTextarea: HTMLTextAreaElement, + pairsTextarea: HTMLTextAreaElement, + launchAttributionsTextarea: HTMLTextAreaElement, + poolOriginsTextarea: HTMLTextAreaElement, + walletsTextarea: HTMLTextAreaElement, + tradeEventsTextarea: HTMLTextAreaElement, + pairMetricsTextarea: HTMLTextAreaElement, + pairCandlesTextarea: HTMLTextAreaElement, + pairAnalyticSignalsTextarea: HTMLTextAreaElement, +): void { + summaryTextarea.value = ""; + transactionTextarea.value = ""; + decodedEventsTextarea.value = ""; + poolsTextarea.value = ""; + pairsTextarea.value = ""; + launchAttributionsTextarea.value = ""; + poolOriginsTextarea.value = ""; + walletsTextarea.value = ""; + tradeEventsTextarea.value = ""; + pairMetricsTextarea.value = ""; + pairCandlesTextarea.value = ""; + pairAnalyticSignalsTextarea.value = ""; +} + +function readCustomTimeframeSeconds( + input: HTMLInputElement, + logTextarea: HTMLTextAreaElement, +): number | null | undefined { + const customTimeframeText = input.value.trim(); + if (customTimeframeText === "") { + return null; + } + + const parsed = Number.parseInt(customTimeframeText, 10); + if (Number.isNaN(parsed) || parsed <= 0) { + appendLogLine(logTextarea, `[ui] invalid custom timeframe '${customTimeframeText}'`); + return undefined; + } + + return parsed; +} + +document.addEventListener("DOMContentLoaded", async () => { + void takeoverConsole(); + debug("demo_pipeline window loaded"); + + const sidebarToggle = document.querySelector('#sidebarToggle'); + if (sidebarToggle) { + // restaurer l’état depuis localStorage + if (localStorage.getItem('sidebar-toggle') === 'true') { + document.body.classList.add('sidenav-toggled'); + } + + sidebarToggle.addEventListener('click', (event) => { + event.preventDefault(); + document.body.classList.toggle('sidenav-toggled'); + localStorage.setItem('sidebar-toggle', document.body.classList.contains('sidenav-toggled') ? 'true' : 'false'); + }); + } + + const tooltipTriggerList = document.querySelectorAll('[data-bs-toggle="tooltip"]'); + Array.from(tooltipTriggerList).map(tooltipTriggerEl => new bootstrap.Tooltip(tooltipTriggerEl)); + const toastElList = document.querySelectorAll('.toast'); + Array.from(toastElList).map(toastEl => new bootstrap.Toast(toastEl)); + const popoverTriggerList = document.querySelectorAll('[data-bs-toggle="popover"]'); + Array.from(popoverTriggerList).map(popoverTriggerEl => new bootstrap.Popover(popoverTriggerEl)); + + const gobackto = location.pathname + location.search; + + document.querySelectorAll('a[data-setlang]').forEach((a) => { + const href = a.getAttribute("href"); + if (!href) return; // pas de href => on ignore + + const url = new URL(href, location.origin); + url.searchParams.set("gobackto", gobackto); + + // conserve une URL relative (path + query) + a.setAttribute("href", url.pathname + "?" + url.searchParams.toString()); + }); + + const signatureInput = document.querySelector("#demoPipelineSignatureInput"); + const customTimeframeInput = document.querySelector("#demoPipelineCustomTimeframeInput"); + const inspectButton = document.querySelector("#demoPipelineInspectButton"); + const clearButton = document.querySelector("#demoPipelineClearButton"); + const clearLogButton = document.querySelector("#demoPipelineClearLogButton"); + + const summaryTextarea = document.querySelector("#demoPipelineSummaryTextarea"); + const transactionTextarea = document.querySelector("#demoPipelineTransactionTextarea"); + const decodedEventsTextarea = document.querySelector("#demoPipelineDecodedEventsTextarea"); + const poolsTextarea = document.querySelector("#demoPipelinePoolsTextarea"); + const pairsTextarea = document.querySelector("#demoPipelinePairsTextarea"); + const launchAttributionsTextarea = document.querySelector("#demoPipelineLaunchAttributionsTextarea"); + const poolOriginsTextarea = document.querySelector("#demoPipelinePoolOriginsTextarea"); + const walletsTextarea = document.querySelector("#demoPipelineWalletsTextarea"); + const tradeEventsTextarea = document.querySelector("#demoPipelineTradeEventsTextarea"); + const pairMetricsTextarea = document.querySelector("#demoPipelinePairMetricsTextarea"); + const pairCandlesTextarea = document.querySelector("#demoPipelinePairCandlesTextarea"); + const pairAnalyticSignalsTextarea = document.querySelector("#demoPipelinePairAnalyticSignalsTextarea"); + const logTextarea = document.querySelector("#demoPipelineLogTextarea"); + const tokenMintInput = document.querySelector("#demoPipelineTokenMintInput"); + const inspectTokenButton = document.querySelector("#demoPipelineInspectTokenButton"); + const pairIdInput = document.querySelector("#demoPipelinePairIdInput"); + const inspectPairButton = document.querySelector("#demoPipelineInspectPairButton"); + const poolAddressInput = document.querySelector("#demoPipelinePoolAddressInput"); + const inspectPoolButton = document.querySelector("#demoPipelineInspectPoolButton"); + + if ( + !pairIdInput || + !inspectPairButton || + !poolAddressInput || + !inspectPoolButton || + !tokenMintInput || + !inspectTokenButton || + !signatureInput || + !customTimeframeInput || + !inspectButton || + !clearButton || + !clearLogButton || + !summaryTextarea || + !transactionTextarea || + !decodedEventsTextarea || + !poolsTextarea || + !pairsTextarea || + !launchAttributionsTextarea || + !poolOriginsTextarea || + !walletsTextarea || + !tradeEventsTextarea || + !pairMetricsTextarea || + !pairCandlesTextarea || + !pairAnalyticSignalsTextarea || + !logTextarea + ) { + console.error("demo_pipeline DOM is incomplete"); + return; + } + + clearButton.addEventListener("click", () => { + clearInspection( + summaryTextarea, + transactionTextarea, + decodedEventsTextarea, + poolsTextarea, + pairsTextarea, + launchAttributionsTextarea, + poolOriginsTextarea, + walletsTextarea, + tradeEventsTextarea, + pairMetricsTextarea, + pairCandlesTextarea, + pairAnalyticSignalsTextarea, + ); + signatureInput.value = ""; + customTimeframeInput.value = ""; + tokenMintInput.value = ""; + pairIdInput.value = ""; + poolAddressInput.value = ""; + appendLogLine(logTextarea, "[ui] inspection state cleared"); + }); + + clearLogButton.addEventListener("click", () => { + logTextarea.value = ""; + }); + + inspectButton.addEventListener("click", async () => { + const signature = signatureInput.value.trim(); + if (signature === "") { + appendLogLine(logTextarea, "[ui] signature is required"); + return; + } + + let customTimeframeSeconds: number | null = null; + const customTimeframeText = customTimeframeInput.value.trim(); + if (customTimeframeText !== "") { + const parsed = Number.parseInt(customTimeframeText, 10); + if (Number.isNaN(parsed) || parsed <= 0) { + appendLogLine(logTextarea, `[ui] invalid custom timeframe '${customTimeframeText}'`); + return; + } + customTimeframeSeconds = parsed; + } + + appendLogLine( + logTextarea, + `[ui] inspecting signature '${signature}'${customTimeframeSeconds === null ? "" : ` with custom timeframe ${customTimeframeSeconds}s`}`, + ); + + const request: DemoPipelineInspectRequest = { + signature, + customTimeframeSeconds, + }; + + try { + const payload = await invoke("demo_pipeline_inspect_signature", { request }); + + summaryTextarea.value = payload.summaryJson; + transactionTextarea.value = payload.transactionJson; + decodedEventsTextarea.value = payload.decodedEventsJson; + poolsTextarea.value = payload.poolsJson; + pairsTextarea.value = payload.pairsJson; + launchAttributionsTextarea.value = payload.launchAttributionsJson; + poolOriginsTextarea.value = payload.poolOriginsJson; + walletsTextarea.value = payload.walletsJson; + tradeEventsTextarea.value = payload.tradeEventsJson; + pairMetricsTextarea.value = payload.pairMetricsJson; + pairCandlesTextarea.value = payload.pairCandlesJson; + pairAnalyticSignalsTextarea.value = payload.pairAnalyticSignalsJson; + + appendLogLine(logTextarea, `[ui] inspection completed for '${payload.signature}'`); + } catch (error) { + appendLogLine(logTextarea, `[ui] inspect error: ${String(error)}`); + } + }); + + inspectTokenButton.addEventListener("click", async () => { + const tokenMint = tokenMintInput.value.trim(); + if (tokenMint === "") { + appendLogLine(logTextarea, "[ui] token mint is required"); + return; + } + + let customTimeframeSeconds: number | null = null; + const customTimeframeText = customTimeframeInput.value.trim(); + if (customTimeframeText !== "") { + const parsed = Number.parseInt(customTimeframeText, 10); + if (Number.isNaN(parsed) || parsed <= 0) { + appendLogLine(logTextarea, `[ui] invalid custom timeframe '${customTimeframeText}'`); + return; + } + customTimeframeSeconds = parsed; + } + + appendLogLine( + logTextarea, + `[ui] inspecting token mint '${tokenMint}'${customTimeframeSeconds === null ? "" : ` with custom timeframe ${customTimeframeSeconds}s`}`, + ); + + const request: DemoPipelineInspectTokenRequest = { + tokenMint, + customTimeframeSeconds, + }; + + try { + const payload = await invoke("demo_pipeline_inspect_token_mint", { request }); + + summaryTextarea.value = payload.summaryJson; + transactionTextarea.value = payload.transactionJson; + decodedEventsTextarea.value = payload.decodedEventsJson; + poolsTextarea.value = payload.poolsJson; + pairsTextarea.value = payload.pairsJson; + launchAttributionsTextarea.value = payload.launchAttributionsJson; + poolOriginsTextarea.value = payload.poolOriginsJson; + walletsTextarea.value = payload.walletsJson; + tradeEventsTextarea.value = payload.tradeEventsJson; + pairMetricsTextarea.value = payload.pairMetricsJson; + pairCandlesTextarea.value = payload.pairCandlesJson; + pairAnalyticSignalsTextarea.value = payload.pairAnalyticSignalsJson; + + appendLogLine(logTextarea, `[ui] token inspection completed for '${payload.signature}'`); + } catch (error) { + appendLogLine(logTextarea, `[ui] token inspect error: ${String(error)}`); + } + }); + + inspectPairButton.addEventListener("click", async () => { + const pairIdText = pairIdInput.value.trim(); + if (pairIdText === "") { + appendLogLine(logTextarea, "[ui] pair id is required"); + return; + } + + const parsedPairId = Number.parseInt(pairIdText, 10); + if (Number.isNaN(parsedPairId) || parsedPairId <= 0) { + appendLogLine(logTextarea, `[ui] invalid pair id '${pairIdText}'`); + return; + } + + const customTimeframeSeconds = readCustomTimeframeSeconds(customTimeframeInput, logTextarea); + if (customTimeframeSeconds === undefined) { + return; + } + + appendLogLine( + logTextarea, + `[ui] inspecting pair id '${parsedPairId}'${customTimeframeSeconds === null ? "" : ` with custom timeframe ${customTimeframeSeconds}s`}`, + ); + + const request: DemoPipelineInspectPairRequest = { + pairId: parsedPairId, + customTimeframeSeconds, + }; + + try { + const payload = await invoke("demo_pipeline_inspect_pair_id", { request }); + + summaryTextarea.value = payload.summaryJson; + transactionTextarea.value = payload.transactionJson; + decodedEventsTextarea.value = payload.decodedEventsJson; + poolsTextarea.value = payload.poolsJson; + pairsTextarea.value = payload.pairsJson; + launchAttributionsTextarea.value = payload.launchAttributionsJson; + poolOriginsTextarea.value = payload.poolOriginsJson; + walletsTextarea.value = payload.walletsJson; + tradeEventsTextarea.value = payload.tradeEventsJson; + pairMetricsTextarea.value = payload.pairMetricsJson; + pairCandlesTextarea.value = payload.pairCandlesJson; + pairAnalyticSignalsTextarea.value = payload.pairAnalyticSignalsJson; + + appendLogLine(logTextarea, `[ui] pair inspection completed for '${payload.signature}'`); + } catch (error) { + appendLogLine(logTextarea, `[ui] pair inspect error: ${String(error)}`); + } + }); + + inspectPoolButton.addEventListener("click", async () => { + const poolAddress = poolAddressInput.value.trim(); + if (poolAddress === "") { + appendLogLine(logTextarea, "[ui] pool address is required"); + return; + } + + const customTimeframeSeconds = readCustomTimeframeSeconds(customTimeframeInput, logTextarea); + if (customTimeframeSeconds === undefined) { + return; + } + + appendLogLine( + logTextarea, + `[ui] inspecting pool '${poolAddress}'${customTimeframeSeconds === null ? "" : ` with custom timeframe ${customTimeframeSeconds}s`}`, + ); + + const request: DemoPipelineInspectPoolRequest = { + poolAddress, + customTimeframeSeconds, + }; + + try { + const payload = await invoke("demo_pipeline_inspect_pool_address", { request }); + + summaryTextarea.value = payload.summaryJson; + transactionTextarea.value = payload.transactionJson; + decodedEventsTextarea.value = payload.decodedEventsJson; + poolsTextarea.value = payload.poolsJson; + pairsTextarea.value = payload.pairsJson; + launchAttributionsTextarea.value = payload.launchAttributionsJson; + poolOriginsTextarea.value = payload.poolOriginsJson; + walletsTextarea.value = payload.walletsJson; + tradeEventsTextarea.value = payload.tradeEventsJson; + pairMetricsTextarea.value = payload.pairMetricsJson; + pairCandlesTextarea.value = payload.pairCandlesJson; + pairAnalyticSignalsTextarea.value = payload.pairAnalyticSignalsJson; + + appendLogLine(logTextarea, `[ui] pool inspection completed for '${payload.signature}'`); + } catch (error) { + appendLogLine(logTextarea, `[ui] pool inspect error: ${String(error)}`); + } + }); +}); \ No newline at end of file diff --git a/kb_app/frontend/ts/main.ts b/kb_app/frontend/ts/main.ts index 81112d9..487feac 100644 --- a/kb_app/frontend/ts/main.ts +++ b/kb_app/frontend/ts/main.ts @@ -31,8 +31,19 @@ async function openDemoWsManagerWindow(): Promise { console.error("open_demo_ws_manager_window failed:", error); } } + +async function openDemoPipelineWindow(): Promise { + try { + await invoke("open_demo_pipeline_window"); + } catch (error) { + console.error("open_demo_pipeline_window failed:", error); + } +} document.addEventListener("DOMContentLoaded", async () => { void takeoverConsole(); + + debug("main window loaded"); + const sidebarToggle = document.querySelector('#sidebarToggle'); if (sidebarToggle) { // restaurer l’état depuis localStorage @@ -66,14 +77,16 @@ document.addEventListener("DOMContentLoaded", async () => { // conserve une URL relative (path + query) a.setAttribute("href", url.pathname + "?" + url.searchParams.toString()); }); - + const openDemoWsButton = document.querySelector("#openDemoWsButton"); const openDemoWsButtonSecondary = document.querySelector("#openDemoWsButtonSecondary"); - + const openDemoHttpButton = document.querySelector("#openDemoHttpButton"); const openDemoHttpButtonSecondary = document.querySelector("#openDemoHttpButtonSecondary"); const openDemoWsManagerButton = document.querySelector("#openDemoWsManagerButton"); const openDemoWsManagerButtonSecondary = document.querySelector("#openDemoWsManagerButtonSecondary"); + const openDemoPipelineButton = document.querySelector("#openDemoPipelineButton"); + const openDemoPipelineButtonSecondary = document.querySelector("#openDemoPipelineButtonSecondary"); if (openDemoWsButton) { openDemoWsButton.addEventListener("click", () => { @@ -86,7 +99,7 @@ document.addEventListener("DOMContentLoaded", async () => { void openDemoWsWindow(); }); } - + if (openDemoHttpButton) { openDemoHttpButton.addEventListener("click", () => { void openDemoHttpWindow(); @@ -110,7 +123,17 @@ document.addEventListener("DOMContentLoaded", async () => { void openDemoWsManagerWindow(); }); } - - debug("window loaded"); + + if (openDemoPipelineButton) { + openDemoPipelineButton.addEventListener("click", () => { + void openDemoPipelineWindow(); + }); + } + + if (openDemoPipelineButtonSecondary) { + openDemoPipelineButtonSecondary.addEventListener("click", () => { + void openDemoPipelineWindow(); + }); + } }); \ No newline at end of file diff --git a/kb_app/gen/schemas/capabilities.json b/kb_app/gen/schemas/capabilities.json index 99db2a1..04512ec 100644 --- a/kb_app/gen/schemas/capabilities.json +++ b/kb_app/gen/schemas/capabilities.json @@ -1 +1 @@ -{"default":{"identifier":"default","description":"Capability for the main window","local":true,"windows":["main","splash","demo_ws","demo_http","demo_ws_manager"],"permissions":["core:default","tracing:default"]}} \ No newline at end of file +{"default":{"identifier":"default","description":"Capability for the main window","local":true,"windows":["main","splash","demo_ws","demo_http","demo_ws_manager","demo_pipeline"],"permissions":["core:default","tracing:default"]}} \ No newline at end of file diff --git a/kb_app/package.json b/kb_app/package.json index 1c58b35..c923029 100644 --- a/kb_app/package.json +++ b/kb_app/package.json @@ -1,7 +1,7 @@ { "name": "kb-app", "private": true, - "version": "0.6.6", + "version": "0.7.22", "type": "module", "scripts": { "dev": "vite", @@ -26,4 +26,4 @@ "typescript": "^5.9", "vite": "^8.0" } -} +} \ No newline at end of file diff --git a/kb_app/src/demo_pipeline.rs b/kb_app/src/demo_pipeline.rs new file mode 100644 index 0000000..281e049 --- /dev/null +++ b/kb_app/src/demo_pipeline.rs @@ -0,0 +1,1494 @@ +// file: kb_app/src/demo_pipeline.rs + +//! Tauri commands for the pipeline inspection demo window. + +use tauri::Manager; + +/// Request payload for one pipeline inspection by signature. +#[derive(Clone, Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct KbDemoPipelineInspectRequest { + /// Transaction signature to inspect. + pub signature: std::string::String, + /// Optional custom timeframe in seconds for on-demand candle rebuild. + pub custom_timeframe_seconds: std::option::Option, +} + +/// Response payload for one pipeline inspection. +#[derive(Clone, Debug, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct KbDemoPipelineInspectPayload { + /// Inspected signature. + pub signature: std::string::String, + /// Summary JSON block. + pub summary_json: std::string::String, + /// Resolved transaction JSON block. + pub transaction_json: std::string::String, + /// Decoded events JSON block. + pub decoded_events_json: std::string::String, + /// Pools JSON block. + pub pools_json: std::string::String, + /// Pairs JSON block. + pub pairs_json: std::string::String, + /// Launch attributions JSON block. + pub launch_attributions_json: std::string::String, + /// Pool origins JSON block. + pub pool_origins_json: std::string::String, + /// Wallet inspection JSON block. + pub wallets_json: std::string::String, + /// Trade events JSON block. + pub trade_events_json: std::string::String, + /// Pair metrics JSON block. + pub pair_metrics_json: std::string::String, + /// Pair candles JSON block. + pub pair_candles_json: std::string::String, + /// Pair analytic signals JSON block. + pub pair_analytic_signals_json: std::string::String, +} + +/// Request payload for one pipeline inspection by token mint. +#[derive(Clone, Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct KbDemoPipelineInspectTokenRequest { + /// Token mint to inspect. + pub token_mint: std::string::String, + /// Optional custom timeframe in seconds for on-demand candle rebuild. + pub custom_timeframe_seconds: std::option::Option, +} + +/// Request payload for one pipeline inspection by pair id. +#[derive(Clone, Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct KbDemoPipelineInspectPairRequest { + /// Pair id to inspect. + pub pair_id: i64, + /// Optional custom timeframe in seconds for on-demand candle rebuild. + pub custom_timeframe_seconds: std::option::Option, +} + +/// Request payload for one pipeline inspection by pool address. +#[derive(Clone, Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct KbDemoPipelineInspectPoolRequest { + /// Pool address to inspect. + pub pool_address: std::string::String, + /// Optional custom timeframe in seconds for on-demand candle rebuild. + pub custom_timeframe_seconds: std::option::Option, +} + +/// Inspects one pair id through the persisted `kb_lib` pipeline state. +#[tauri::command] +pub(crate) async fn demo_pipeline_inspect_pair_id( + state: tauri::State<'_, crate::KbAppState>, + request: KbDemoPipelineInspectPairRequest, +) -> Result { + if request.pair_id <= 0 { + return Err("demo pipeline pair id must be > 0".to_string()); + } + let database = state.database.clone(); + let pairs_result = kb_lib::list_pairs(database.as_ref()).await; + let all_pairs = match pairs_result { + Ok(all_pairs) => all_pairs, + Err(error) => { + return Err(format!("cannot list pairs from database: {}", error)); + } + }; + let mut inspected_pair_option = std::option::Option::::None; + for pair in all_pairs { + let pair_id_option = pair.id; + let pair_id = match pair_id_option { + Some(pair_id) => pair_id, + None => continue, + }; + if pair_id == request.pair_id { + inspected_pair_option = Some(pair); + break; + } + } + let inspected_pair = match inspected_pair_option { + Some(inspected_pair) => inspected_pair, + None => { + return Err(format!( + "unknown pair id '{}' in local pipeline database '{}'", + request.pair_id, + state.database.database_url() + )); + } + }; + kb_demo_pipeline_build_pair_payload( + state.database.clone(), + state.database.database_url().to_string(), + inspected_pair, + request.custom_timeframe_seconds, + format!("pair:{}", request.pair_id), + ) + .await +} + +/// Inspects one pool address through the persisted `kb_lib` pipeline state. +#[tauri::command] +pub(crate) async fn demo_pipeline_inspect_pool_address( + state: tauri::State<'_, crate::KbAppState>, + request: KbDemoPipelineInspectPoolRequest, +) -> Result { + let pool_address = request.pool_address.trim().to_string(); + if pool_address.is_empty() { + return Err("demo pipeline pool address must not be empty".to_string()); + } + let database = state.database.clone(); + let pool_result = kb_lib::get_pool_by_address(database.as_ref(), pool_address.as_str()).await; + let pool_option = match pool_result { + Ok(pool_option) => pool_option, + Err(error) => { + return Err(format!( + "cannot read pool address '{}' from database: {}", + pool_address, error + )); + } + }; + let pool = match pool_option { + Some(pool) => pool, + None => { + return Err(format!( + "unknown pool address '{}' in local pipeline database '{}'", + pool_address, + state.database.database_url() + )); + } + }; + let pool_id = match pool.id { + Some(pool_id) => pool_id, + None => { + return Err(format!( + "pool '{}' has no internal id", + pool.address + )); + } + }; + let pair_result = kb_lib::get_pair_by_pool_id(database.as_ref(), pool_id).await; + let pair_option = match pair_result { + Ok(pair_option) => pair_option, + Err(error) => { + return Err(format!( + "cannot fetch pair by pool_id '{}' for pool '{}': {}", + pool_id, pool.address, error + )); + } + }; + let pair = match pair_option { + Some(pair) => pair, + None => { + return Err(format!( + "pool '{}' has no associated pair in local pipeline database", + pool.address + )); + } + }; + kb_demo_pipeline_build_pair_payload( + state.database.clone(), + state.database.database_url().to_string(), + pair, + request.custom_timeframe_seconds, + format!("pool:{}", pool_address), + ) + .await +} + +/// Inspects one token mint through the persisted `kb_lib` pipeline state. +#[tauri::command] +pub(crate) async fn demo_pipeline_inspect_token_mint( + state: tauri::State<'_, crate::KbAppState>, + request: KbDemoPipelineInspectTokenRequest, +) -> Result { + let token_mint = request.token_mint.trim().to_string(); + if token_mint.is_empty() { + return Err("demo pipeline token mint must not be empty".to_string()); + } + let database = state.database.clone(); + let token_result = kb_lib::get_token_by_mint(database.as_ref(), token_mint.as_str()).await; + let token_option = match token_result { + Ok(token_option) => token_option, + Err(error) => { + return Err(format!( + "cannot read token mint '{}' from database: {}", + token_mint, error + )); + } + }; + let token = match token_option { + Some(token) => token, + None => { + return Err(format!( + "unknown token mint '{}' in local pipeline database '{}'", + token_mint, + state.database.database_url() + )); + } + }; + let token_id = match token.id { + Some(token_id) => token_id, + None => { + return Err(format!("token mint '{}' has no internal id", token.mint)); + } + }; + let pools_result = kb_lib::list_pools(database.as_ref()).await; + let all_pools = match pools_result { + Ok(all_pools) => all_pools, + Err(error) => { + return Err(format!("cannot list pools from database: {}", error)); + } + }; + let pairs_result = kb_lib::list_pairs(database.as_ref()).await; + let all_pairs = match pairs_result { + Ok(all_pairs) => all_pairs, + Err(error) => { + return Err(format!("cannot list pairs from database: {}", error)); + } + }; + let pool_listings_result = kb_lib::list_pool_listings(database.as_ref()).await; + let all_pool_listings = match pool_listings_result { + Ok(all_pool_listings) => all_pool_listings, + Err(error) => { + return Err(format!( + "cannot list pool listings from database: {}", + error + )); + } + }; + let mut pools = std::vec::Vec::::new(); + let mut pool_ids = std::collections::BTreeSet::::new(); + for pool in all_pools { + let pool_id = match pool.id { + Some(pool_id) => pool_id, + None => continue, + }; + let pool_tokens_result = + kb_lib::list_pool_tokens_by_pool_id(database.as_ref(), pool_id).await; + let pool_tokens = match pool_tokens_result { + Ok(pool_tokens) => pool_tokens, + Err(error) => { + return Err(format!( + "cannot list pool tokens for pool_id '{}': {}", + pool_id, error + )); + } + }; + let mut contains_token = false; + for pool_token in pool_tokens { + if pool_token.token_id == token_id { + contains_token = true; + break; + } + } + if contains_token { + pool_ids.insert(pool_id); + pools.push(pool); + } + } + let mut pairs = std::vec::Vec::::new(); + let mut pair_ids = std::collections::BTreeSet::::new(); + for pair in all_pairs { + let pair_id = match pair.id { + Some(pair_id) => pair_id, + None => continue, + }; + if pair.base_token_id == token_id || pair.quote_token_id == token_id { + pair_ids.insert(pair_id); + pairs.push(pair); + } + } + let mut pool_listings = std::vec::Vec::::new(); + for listing in all_pool_listings { + if pool_ids.contains(&listing.pool_id) { + pool_listings.push(listing); + } + } + let mut launch_attributions = std::vec::Vec::::new(); + for pool_id in &pool_ids { + let attributions_result = + kb_lib::list_launch_attributions_by_pool_id(database.as_ref(), *pool_id).await; + let attributions = match attributions_result { + Ok(attributions) => attributions, + Err(error) => { + return Err(format!( + "cannot list launch attributions for pool_id '{}': {}", + pool_id, error + )); + } + }; + for attribution in attributions { + launch_attributions.push(attribution); + } + } + let mut pool_origins = std::vec::Vec::::new(); + for pool_id in &pool_ids { + let pool_origin_result = + kb_lib::get_pool_origin_by_pool_id(database.as_ref(), *pool_id).await; + let pool_origin_option = match pool_origin_result { + Ok(pool_origin_option) => pool_origin_option, + Err(error) => { + return Err(format!( + "cannot fetch pool origin for pool_id '{}': {}", + pool_id, error + )); + } + }; + if let Some(pool_origin) = pool_origin_option { + pool_origins.push(pool_origin); + } + } + let mut wallet_holding_groups = std::vec::Vec::::new(); + let wallets_result = kb_lib::list_wallets(database.as_ref()).await; + let wallets = match wallets_result { + Ok(wallets) => wallets, + Err(error) => { + return Err(format!("cannot list wallets from database: {}", error)); + } + }; + for wallet in wallets { + let wallet_id = match wallet.id { + Some(wallet_id) => wallet_id, + None => continue, + }; + let holdings_result = + kb_lib::list_wallet_holdings_by_wallet_id(database.as_ref(), wallet_id).await; + let holdings = match holdings_result { + Ok(holdings) => holdings, + Err(error) => { + return Err(format!( + "cannot list wallet holdings for wallet_id '{}': {}", + wallet_id, error + )); + } + }; + let mut filtered_holdings = std::vec::Vec::new(); + for holding in holdings { + if holding.token_id == token_id { + filtered_holdings.push(holding); + } + } + if !filtered_holdings.is_empty() { + let wallet_value_result = serde_json::to_value(&wallet); + let wallet_value = match wallet_value_result { + Ok(wallet_value) => wallet_value, + Err(error) => { + return Err(format!( + "cannot serialize wallet '{}' to JSON value: {}", + wallet.address, error + )); + } + }; + let holdings_value_result = serde_json::to_value(&filtered_holdings); + let holdings_value = match holdings_value_result { + Ok(holdings_value) => holdings_value, + Err(error) => { + return Err(format!( + "cannot serialize holdings for wallet '{}' to JSON value: {}", + wallet.address, error + )); + } + }; + wallet_holding_groups.push(serde_json::json!({ + "walletAddress": wallet.address, + "wallet": wallet_value, + "holdings": holdings_value + })); + } + } + let mut pair_metrics = std::vec::Vec::::new(); + let mut pair_candle_groups = std::vec::Vec::::new(); + let mut pair_analytic_signal_groups = std::vec::Vec::::new(); + let query_service = kb_lib::KbPairCandleQueryService::new(database.clone()); + let mut timeframes = vec![60_i64, 300_i64, 900_i64, 3600_i64]; + if let Some(custom_timeframe_seconds) = request.custom_timeframe_seconds { + if custom_timeframe_seconds > 0 && !timeframes.contains(&custom_timeframe_seconds) { + timeframes.push(custom_timeframe_seconds); + } + } + for pair_id in &pair_ids { + let pair_metric_result = + kb_lib::get_pair_metric_by_pair_id(database.as_ref(), *pair_id).await; + let pair_metric_option = match pair_metric_result { + Ok(pair_metric_option) => pair_metric_option, + Err(error) => { + return Err(format!( + "cannot fetch pair metric for pair_id '{}': {}", + pair_id, error + )); + } + }; + if let Some(pair_metric) = pair_metric_option { + pair_metrics.push(pair_metric); + } + let pair_signals_result = + kb_lib::list_pair_analytic_signals_by_pair_id(database.as_ref(), *pair_id).await; + let pair_signals = match pair_signals_result { + Ok(pair_signals) => pair_signals, + Err(error) => { + return Err(format!( + "cannot list pair analytic signals for pair_id '{}': {}", + pair_id, error + )); + } + }; + let pair_signals_value_result = serde_json::to_value(&pair_signals); + let pair_signals_value = match pair_signals_value_result { + Ok(pair_signals_value) => pair_signals_value, + Err(error) => { + return Err(format!( + "cannot serialize pair analytic signals for pair_id '{}': {}", + pair_id, error + )); + } + }; + pair_analytic_signal_groups.push(serde_json::json!({ + "pairId": pair_id, + "signals": pair_signals_value + })); + for timeframe_seconds in &timeframes { + let candles_result = if *timeframe_seconds == 60 + || *timeframe_seconds == 300 + || *timeframe_seconds == 900 + || *timeframe_seconds == 3600 + { + kb_lib::list_pair_candles_by_pair_and_timeframe( + database.as_ref(), + *pair_id, + *timeframe_seconds, + ) + .await + } else { + query_service + .list_pair_candles(*pair_id, *timeframe_seconds, None, None, false) + .await + }; + let candles = match candles_result { + Ok(candles) => candles, + Err(error) => { + return Err(format!( + "cannot list/rebuild pair candles for pair_id '{}' timeframe '{}': {}", + pair_id, timeframe_seconds, error + )); + } + }; + let candles_value_result = serde_json::to_value(&candles); + let candles_value = match candles_value_result { + Ok(candles_value) => candles_value, + Err(error) => { + return Err(format!( + "cannot serialize pair candles for pair_id '{}' timeframe '{}': {}", + pair_id, timeframe_seconds, error + )); + } + }; + pair_candle_groups.push(serde_json::json!({ + "pairId": pair_id, + "timeframeSeconds": timeframe_seconds, + "candles": candles_value + })); + } + } + let summary_value = serde_json::json!({ + "mode": "tokenMint", + "databaseUrl": state.database.database_url(), + "tokenMint": token.mint, + "tokenId": token_id, + "customTimeframeSeconds": request.custom_timeframe_seconds, + "poolCount": pools.len(), + "pairCount": pairs.len(), + "poolListingCount": pool_listings.len(), + "launchAttributionCount": launch_attributions.len(), + "poolOriginCount": pool_origins.len(), + "walletHoldingGroupCount": wallet_holding_groups.len(), + "pairMetricCount": pair_metrics.len(), + "pairCandleGroupCount": pair_candle_groups.len(), + "pairAnalyticSignalGroupCount": pair_analytic_signal_groups.len() + }); + let summary_json = kb_to_pretty_json(&summary_value, "summary")?; + let transaction_json = kb_to_pretty_json(&token, "token")?; + let decoded_events_json = kb_to_pretty_json(&pool_listings, "pool listings")?; + let pools_json = kb_to_pretty_json(&pools, "pools")?; + let pairs_json = kb_to_pretty_json(&pairs, "pairs")?; + let launch_attributions_json = kb_to_pretty_json(&launch_attributions, "launch attributions")?; + let pool_origins_json = kb_to_pretty_json(&pool_origins, "pool origins")?; + let wallets_json = kb_to_pretty_json(&wallet_holding_groups, "wallet holdings")?; + let trade_events_json = + kb_to_pretty_json(&std::vec::Vec::::new(), "trade events")?; + let pair_metrics_json = kb_to_pretty_json(&pair_metrics, "pair metrics")?; + let pair_candles_json = kb_to_pretty_json(&pair_candle_groups, "pair candles")?; + let pair_analytic_signals_json = + kb_to_pretty_json(&pair_analytic_signal_groups, "pair analytic signals")?; + Ok(KbDemoPipelineInspectPayload { + signature: token_mint, + summary_json, + transaction_json, + decoded_events_json, + pools_json, + pairs_json, + launch_attributions_json, + pool_origins_json, + wallets_json, + trade_events_json, + pair_metrics_json, + pair_candles_json, + pair_analytic_signals_json, + }) +} + +/// Opens the dedicated pipeline inspection window. +#[tauri::command] +pub(crate) fn open_demo_pipeline_window( + app_handle: tauri::AppHandle, +) -> Result<(), std::string::String> { + let existing_window_option = app_handle.get_webview_window("demo_pipeline"); + let demo_window = match existing_window_option { + Some(demo_window) => demo_window, + None => { + let builder = tauri::WebviewWindowBuilder::new( + &app_handle, + "demo_pipeline", + tauri::WebviewUrl::App("demo_pipeline.html".into()), + ) + .title("Demo Pipeline") + .inner_size(1480.0, 920.0) + .min_inner_size(1000.0, 700.0) + .center() + .visible(true) + .transparent(false) + .decorations(true); + let build_result = builder.build(); + match build_result { + Ok(window) => window, + Err(error) => { + return Err(format!("cannot create demo_pipeline window: {error:?}")); + } + } + } + }; + let show_result = demo_window.show(); + if let Err(error) = show_result { + return Err(format!("cannot show demo_pipeline window: {error:?}")); + } + let focus_result = demo_window.set_focus(); + if let Err(error) = focus_result { + return Err(format!("cannot focus demo_pipeline window: {error:?}")); + } + Ok(()) +} + +/// Inspects one transaction signature through the persisted `kb_lib` pipeline state. +#[tauri::command] +pub(crate) async fn demo_pipeline_inspect_signature( + state: tauri::State<'_, crate::KbAppState>, + request: KbDemoPipelineInspectRequest, +) -> Result { + let signature = request.signature.trim().to_string(); + if signature.is_empty() { + return Err("demo pipeline signature must not be empty".to_string()); + } + let database = state.database.clone(); + let transaction_result = + kb_lib::get_chain_transaction_by_signature(database.as_ref(), signature.as_str()).await; + let transaction_option = match transaction_result { + Ok(transaction_option) => transaction_option, + Err(error) => { + return Err(format!( + "cannot read chain transaction '{}' from database: {}", + signature, error + )); + } + }; + let transaction = match transaction_option { + Some(transaction) => transaction, + None => { + return Err(format!( + "unknown transaction signature '{}' in local pipeline database", + signature + )); + } + }; + let transaction_id = match transaction.id { + Some(transaction_id) => transaction_id, + None => { + return Err(format!("transaction '{}' has no internal id", signature)); + } + }; + let decoded_events_result = + kb_lib::list_dex_decoded_events_by_transaction_id(database.as_ref(), transaction_id).await; + let decoded_events = match decoded_events_result { + Ok(decoded_events) => decoded_events, + Err(error) => { + return Err(format!( + "cannot list decoded events for transaction '{}': {}", + signature, error + )); + } + }; + let mut decoded_event_ids = std::collections::BTreeSet::::new(); + let mut pool_ids = std::collections::BTreeSet::::new(); + let mut pair_ids = std::collections::BTreeSet::::new(); + let mut wallet_addresses = std::collections::BTreeSet::::new(); + let mut pools = std::vec::Vec::::new(); + let mut pairs = std::vec::Vec::::new(); + let mut launch_attributions = std::vec::Vec::::new(); + let mut pool_origins = std::vec::Vec::::new(); + for decoded_event in &decoded_events { + if let Some(decoded_event_id) = decoded_event.id { + decoded_event_ids.insert(decoded_event_id); + let launch_attribution_result = kb_lib::get_launch_attribution_by_decoded_event_id( + database.as_ref(), + decoded_event_id, + ) + .await; + let launch_attribution_option = match launch_attribution_result { + Ok(launch_attribution_option) => launch_attribution_option, + Err(error) => { + return Err(format!( + "cannot fetch launch attribution for decoded_event_id '{}': {}", + decoded_event_id, error + )); + } + }; + if let Some(launch_attribution) = launch_attribution_option { + launch_attributions.push(launch_attribution); + } + } + let payload_parse_result = + serde_json::from_str::(decoded_event.payload_json.as_str()); + let payload_value = match payload_parse_result { + Ok(payload_value) => payload_value, + Err(error) => { + return Err(format!( + "cannot parse decoded_event payload_json for '{}': {}", + signature, error + )); + } + }; + let extracted_wallets = kb_extract_wallet_addresses_from_value(&payload_value); + for wallet_address in extracted_wallets { + wallet_addresses.insert(wallet_address); + } + if let Some(token_a_mint) = decoded_event.token_a_mint.clone() { + let _ = token_a_mint; + } + if let Some(token_b_mint) = decoded_event.token_b_mint.clone() { + let _ = token_b_mint; + } + if let Some(pool_address) = decoded_event.pool_account.clone() { + let pool_result = + kb_lib::get_pool_by_address(database.as_ref(), pool_address.as_str()).await; + let pool_option = match pool_result { + Ok(pool_option) => pool_option, + Err(error) => { + return Err(format!( + "cannot fetch pool by address '{}' for '{}': {}", + pool_address, signature, error + )); + } + }; + if let Some(pool) = pool_option { + let pool_id = match pool.id { + Some(pool_id) => pool_id, + None => { + return Err(format!("pool '{}' has no internal id", pool.address)); + } + }; + if !pool_ids.contains(&pool_id) { + pool_ids.insert(pool_id); + pools.push(pool.clone()); + let pool_origin_result = + kb_lib::get_pool_origin_by_pool_id(database.as_ref(), pool_id).await; + let pool_origin_option = match pool_origin_result { + Ok(pool_origin_option) => pool_origin_option, + Err(error) => { + return Err(format!( + "cannot fetch pool origin for pool_id '{}': {}", + pool_id, error + )); + } + }; + if let Some(pool_origin) = pool_origin_option { + pool_origins.push(pool_origin); + } + let pair_result = kb_lib::get_pair_by_pool_id(database.as_ref(), pool_id).await; + let pair_option = match pair_result { + Ok(pair_option) => pair_option, + Err(error) => { + return Err(format!( + "cannot fetch pair by pool_id '{}': {}", + pool_id, error + )); + } + }; + if let Some(pair) = pair_option { + let pair_id = match pair.id { + Some(pair_id) => pair_id, + None => { + return Err(format!( + "pair for pool '{}' has no internal id", + pool_id + )); + } + }; + if !pair_ids.contains(&pair_id) { + pair_ids.insert(pair_id); + pairs.push(pair); + } + } + } + } + } + } + let mut wallet_contexts = std::vec::Vec::::new(); + let mut wallet_participation_count = 0_usize; + let mut wallet_holding_count = 0_usize; + for wallet_address in wallet_addresses { + let wallet_result = + kb_lib::get_wallet_by_address(database.as_ref(), wallet_address.as_str()).await; + let wallet_option = match wallet_result { + Ok(wallet_option) => wallet_option, + Err(error) => { + return Err(format!( + "cannot fetch wallet by address '{}': {}", + wallet_address, error + )); + } + }; + let wallet = match wallet_option { + Some(wallet) => wallet, + None => continue, + }; + let wallet_id = match wallet.id { + Some(wallet_id) => wallet_id, + None => { + return Err(format!("wallet '{}' has no internal id", wallet.address)); + } + }; + let participations_result = + kb_lib::list_wallet_participations_by_wallet_id(database.as_ref(), wallet_id).await; + let participations = match participations_result { + Ok(participations) => participations, + Err(error) => { + return Err(format!( + "cannot list wallet participations for wallet_id '{}': {}", + wallet_id, error + )); + } + }; + let mut filtered_participations = std::vec::Vec::new(); + for participation in participations { + let mut include_participation = false; + if participation.transaction_id == transaction_id { + include_participation = true; + } + if !include_participation { + if let Some(decoded_event_id) = participation.decoded_event_id { + if decoded_event_ids.contains(&decoded_event_id) { + include_participation = true; + } + } + } + if include_participation { + filtered_participations.push(participation); + } + } + let holdings_result = + kb_lib::list_wallet_holdings_by_wallet_id(database.as_ref(), wallet_id).await; + let holdings = match holdings_result { + Ok(holdings) => holdings, + Err(error) => { + return Err(format!( + "cannot list wallet holdings for wallet_id '{}': {}", + wallet_id, error + )); + } + }; + let mut filtered_holdings = std::vec::Vec::new(); + for holding in holdings { + if holding.last_transaction_id == transaction_id { + filtered_holdings.push(holding); + } + } + wallet_participation_count += filtered_participations.len(); + wallet_holding_count += filtered_holdings.len(); + let wallet_value_result = serde_json::to_value(&wallet); + let wallet_value = match wallet_value_result { + Ok(wallet_value) => wallet_value, + Err(error) => { + return Err(format!( + "cannot serialize wallet '{}' to JSON value: {}", + wallet.address, error + )); + } + }; + let participations_value_result = serde_json::to_value(&filtered_participations); + let participations_value = match participations_value_result { + Ok(participations_value) => participations_value, + Err(error) => { + return Err(format!( + "cannot serialize wallet participations for '{}' to JSON value: {}", + wallet.address, error + )); + } + }; + let holdings_value_result = serde_json::to_value(&filtered_holdings); + let holdings_value = match holdings_value_result { + Ok(holdings_value) => holdings_value, + Err(error) => { + return Err(format!( + "cannot serialize wallet holdings for '{}' to JSON value: {}", + wallet.address, error + )); + } + }; + wallet_contexts.push(serde_json::json!({ + "walletAddress": wallet.address, + "wallet": wallet_value, + "participations": participations_value, + "holdings": holdings_value + })); + } + let trade_events_result = + kb_lib::list_trade_events_by_transaction_id(database.as_ref(), transaction_id).await; + let transaction_trade_events = match trade_events_result { + Ok(transaction_trade_events) => transaction_trade_events, + Err(error) => { + return Err(format!( + "cannot list trade events for transaction_id '{}': {}", + transaction_id, error + )); + } + }; + let mut pair_metrics = std::vec::Vec::::new(); + let mut pair_candle_groups = std::vec::Vec::::new(); + let mut pair_candle_count = 0_usize; + let mut pair_analytic_signal_groups = std::vec::Vec::::new(); + let mut pair_analytic_signal_count = 0_usize; + let query_service = kb_lib::KbPairCandleQueryService::new(database.clone()); + let mut timeframes = vec![60_i64, 300_i64, 900_i64, 3600_i64]; + if let Some(custom_timeframe_seconds) = request.custom_timeframe_seconds { + if custom_timeframe_seconds > 0 && !timeframes.contains(&custom_timeframe_seconds) { + timeframes.push(custom_timeframe_seconds); + } + } + for pair_id in &pair_ids { + let pair_metric_result = + kb_lib::get_pair_metric_by_pair_id(database.as_ref(), *pair_id).await; + let pair_metric_option = match pair_metric_result { + Ok(pair_metric_option) => pair_metric_option, + Err(error) => { + return Err(format!( + "cannot fetch pair metric for pair_id '{}': {}", + pair_id, error + )); + } + }; + if let Some(pair_metric) = pair_metric_option { + pair_metrics.push(pair_metric); + } + let pair_signals_result = + kb_lib::list_pair_analytic_signals_by_pair_id(database.as_ref(), *pair_id).await; + let pair_signals = match pair_signals_result { + Ok(pair_signals) => pair_signals, + Err(error) => { + return Err(format!( + "cannot list pair analytic signals for pair_id '{}': {}", + pair_id, error + )); + } + }; + pair_analytic_signal_count += pair_signals.len(); + let pair_signals_value_result = serde_json::to_value(&pair_signals); + let pair_signals_value = match pair_signals_value_result { + Ok(pair_signals_value) => pair_signals_value, + Err(error) => { + return Err(format!( + "cannot serialize pair analytic signals for pair_id '{}': {}", + pair_id, error + )); + } + }; + pair_analytic_signal_groups.push(serde_json::json!({ + "pairId": pair_id, + "signals": pair_signals_value + })); + for timeframe_seconds in &timeframes { + let candles_result = if *timeframe_seconds == 60 + || *timeframe_seconds == 300 + || *timeframe_seconds == 900 + || *timeframe_seconds == 3600 + { + kb_lib::list_pair_candles_by_pair_and_timeframe( + database.as_ref(), + *pair_id, + *timeframe_seconds, + ) + .await + } else { + query_service + .list_pair_candles(*pair_id, *timeframe_seconds, None, None, false) + .await + }; + let candles = match candles_result { + Ok(candles) => candles, + Err(error) => { + return Err(format!( + "cannot list/rebuild pair candles for pair_id '{}' timeframe '{}': {}", + pair_id, timeframe_seconds, error + )); + } + }; + pair_candle_count += candles.len(); + let candles_value_result = serde_json::to_value(&candles); + let candles_value = match candles_value_result { + Ok(candles_value) => candles_value, + Err(error) => { + return Err(format!( + "cannot serialize pair candles for pair_id '{}' timeframe '{}': {}", + pair_id, timeframe_seconds, error + )); + } + }; + pair_candle_groups.push(serde_json::json!({ + "pairId": pair_id, + "timeframeSeconds": timeframe_seconds, + "candles": candles_value + })); + } + } + let summary_value = serde_json::json!({ + "signature": signature, + "customTimeframeSeconds": request.custom_timeframe_seconds, + "decodedEventCount": decoded_events.len(), + "poolCount": pools.len(), + "pairCount": pairs.len(), + "launchAttributionCount": launch_attributions.len(), + "poolOriginCount": pool_origins.len(), + "walletCount": wallet_contexts.len(), + "walletParticipationCount": wallet_participation_count, + "walletHoldingCount": wallet_holding_count, + "transactionTradeEventCount": transaction_trade_events.len(), + "pairMetricCount": pair_metrics.len(), + "pairCandleGroupCount": pair_candle_groups.len(), + "pairCandleCount": pair_candle_count, + "pairAnalyticSignalGroupCount": pair_analytic_signal_groups.len(), + "pairAnalyticSignalCount": pair_analytic_signal_count + }); + let summary_json_result = kb_to_pretty_json(&summary_value, "summary"); + let summary_json = match summary_json_result { + Ok(summary_json) => summary_json, + Err(error) => return Err(error), + }; + let transaction_json_result = kb_to_pretty_json(&transaction, "transaction"); + let transaction_json = match transaction_json_result { + Ok(transaction_json) => transaction_json, + Err(error) => return Err(error), + }; + let decoded_events_json_result = kb_to_pretty_json(&decoded_events, "decoded events"); + let decoded_events_json = match decoded_events_json_result { + Ok(decoded_events_json) => decoded_events_json, + Err(error) => return Err(error), + }; + let pools_json_result = kb_to_pretty_json(&pools, "pools"); + let pools_json = match pools_json_result { + Ok(pools_json) => pools_json, + Err(error) => return Err(error), + }; + let pairs_json_result = kb_to_pretty_json(&pairs, "pairs"); + let pairs_json = match pairs_json_result { + Ok(pairs_json) => pairs_json, + Err(error) => return Err(error), + }; + let launch_attributions_json_result = + kb_to_pretty_json(&launch_attributions, "launch attributions"); + let launch_attributions_json = match launch_attributions_json_result { + Ok(launch_attributions_json) => launch_attributions_json, + Err(error) => return Err(error), + }; + let pool_origins_json_result = kb_to_pretty_json(&pool_origins, "pool origins"); + let pool_origins_json = match pool_origins_json_result { + Ok(pool_origins_json) => pool_origins_json, + Err(error) => return Err(error), + }; + let wallets_json_result = kb_to_pretty_json(&wallet_contexts, "wallet contexts"); + let wallets_json = match wallets_json_result { + Ok(wallets_json) => wallets_json, + Err(error) => return Err(error), + }; + let trade_events_json_result = kb_to_pretty_json(&transaction_trade_events, "trade events"); + let trade_events_json = match trade_events_json_result { + Ok(trade_events_json) => trade_events_json, + Err(error) => return Err(error), + }; + let pair_metrics_json_result = kb_to_pretty_json(&pair_metrics, "pair metrics"); + let pair_metrics_json = match pair_metrics_json_result { + Ok(pair_metrics_json) => pair_metrics_json, + Err(error) => return Err(error), + }; + let pair_candles_json_result = kb_to_pretty_json(&pair_candle_groups, "pair candles"); + let pair_candles_json = match pair_candles_json_result { + Ok(pair_candles_json) => pair_candles_json, + Err(error) => return Err(error), + }; + let pair_analytic_signals_json_result = + kb_to_pretty_json(&pair_analytic_signal_groups, "pair analytic signals"); + let pair_analytic_signals_json = match pair_analytic_signals_json_result { + Ok(pair_analytic_signals_json) => pair_analytic_signals_json, + Err(error) => return Err(error), + }; + Ok(KbDemoPipelineInspectPayload { + signature, + summary_json, + transaction_json, + decoded_events_json, + pools_json, + pairs_json, + launch_attributions_json, + pool_origins_json, + wallets_json, + trade_events_json, + pair_metrics_json, + pair_candles_json, + pair_analytic_signals_json, + }) +} + +async fn kb_demo_pipeline_build_pair_payload( + database: std::sync::Arc, + database_url: std::string::String, + inspected_pair: kb_lib::KbPairDto, + custom_timeframe_seconds: std::option::Option, + object_key: std::string::String, +) -> Result { + let pair_id = match inspected_pair.id { + Some(pair_id) => pair_id, + None => { + return Err("inspected pair has no internal id".to_string()); + } + }; + let pools_result = kb_lib::list_pools(database.as_ref()).await; + let all_pools = match pools_result { + Ok(all_pools) => all_pools, + Err(error) => { + return Err(format!("cannot list pools from database: {}", error)); + } + }; + let mut inspected_pool_option = std::option::Option::::None; + for pool in all_pools { + let pool_id_option = pool.id; + let pool_id = match pool_id_option { + Some(pool_id) => pool_id, + None => continue, + }; + if pool_id == inspected_pair.pool_id { + inspected_pool_option = Some(pool); + break; + } + } + let inspected_pool = match inspected_pool_option { + Some(inspected_pool) => inspected_pool, + None => { + return Err(format!( + "pair '{}' references unknown pool_id '{}'", + pair_id, inspected_pair.pool_id + )); + } + }; + let pool_listings_result = kb_lib::list_pool_listings(database.as_ref()).await; + let all_pool_listings = match pool_listings_result { + Ok(all_pool_listings) => all_pool_listings, + Err(error) => { + return Err(format!( + "cannot list pool listings from database: {}", + error + )); + } + }; + let mut pool_listings = std::vec::Vec::::new(); + for listing in all_pool_listings { + if listing.pool_id == inspected_pair.pool_id { + pool_listings.push(listing); + } + } + let launch_attributions_result = + kb_lib::list_launch_attributions_by_pool_id(database.as_ref(), inspected_pair.pool_id) + .await; + let launch_attributions = match launch_attributions_result { + Ok(launch_attributions) => launch_attributions, + Err(error) => { + return Err(format!( + "cannot list launch attributions for pool_id '{}': {}", + inspected_pair.pool_id, error + )); + } + }; + let pool_origin_result = + kb_lib::get_pool_origin_by_pool_id(database.as_ref(), inspected_pair.pool_id).await; + let pool_origin_option = match pool_origin_result { + Ok(pool_origin_option) => pool_origin_option, + Err(error) => { + return Err(format!( + "cannot fetch pool origin for pool_id '{}': {}", + inspected_pair.pool_id, error + )); + } + }; + let trade_events_result = + kb_lib::list_trade_events_by_pair_id(database.as_ref(), pair_id).await; + let trade_events = match trade_events_result { + Ok(trade_events) => trade_events, + Err(error) => { + return Err(format!( + "cannot list trade events for pair_id '{}': {}", + pair_id, error + )); + } + }; + let pair_metric_result = kb_lib::get_pair_metric_by_pair_id(database.as_ref(), pair_id).await; + let pair_metric_option = match pair_metric_result { + Ok(pair_metric_option) => pair_metric_option, + Err(error) => { + return Err(format!( + "cannot fetch pair metric for pair_id '{}': {}", + pair_id, error + )); + } + }; + let pair_signals_result = + kb_lib::list_pair_analytic_signals_by_pair_id(database.as_ref(), pair_id).await; + let pair_signals = match pair_signals_result { + Ok(pair_signals) => pair_signals, + Err(error) => { + return Err(format!( + "cannot list pair analytic signals for pair_id '{}': {}", + pair_id, error + )); + } + }; + let query_service = kb_lib::KbPairCandleQueryService::new(database.clone()); + let mut timeframes = vec![60_i64, 300_i64, 900_i64, 3600_i64]; + if let Some(custom_timeframe_seconds_value) = custom_timeframe_seconds { + if custom_timeframe_seconds_value > 0 + && !timeframes.contains(&custom_timeframe_seconds_value) + { + timeframes.push(custom_timeframe_seconds_value); + } + } + let mut pair_candle_groups = std::vec::Vec::::new(); + for timeframe_seconds in &timeframes { + let candles_result = if *timeframe_seconds == 60 + || *timeframe_seconds == 300 + || *timeframe_seconds == 900 + || *timeframe_seconds == 3600 + { + kb_lib::list_pair_candles_by_pair_and_timeframe( + database.as_ref(), + pair_id, + *timeframe_seconds, + ) + .await + } else { + query_service + .list_pair_candles(pair_id, *timeframe_seconds, None, None, false) + .await + }; + let candles = match candles_result { + Ok(candles) => candles, + Err(error) => { + return Err(format!( + "cannot list/rebuild pair candles for pair_id '{}' timeframe '{}': {}", + pair_id, timeframe_seconds, error + )); + } + }; + let candles_value_result = serde_json::to_value(&candles); + let candles_value = match candles_value_result { + Ok(candles_value) => candles_value, + Err(error) => { + return Err(format!( + "cannot serialize pair candles for pair_id '{}' timeframe '{}': {}", + pair_id, timeframe_seconds, error + )); + } + }; + pair_candle_groups.push(serde_json::json!({ + "pairId": pair_id, + "timeframeSeconds": timeframe_seconds, + "candles": candles_value + })); + } + let wallets_result = kb_lib::list_wallets(database.as_ref()).await; + let wallets = match wallets_result { + Ok(wallets) => wallets, + Err(error) => { + return Err(format!("cannot list wallets from database: {}", error)); + } + }; + let mut wallet_contexts = std::vec::Vec::::new(); + for wallet in wallets { + let wallet_id = match wallet.id { + Some(wallet_id) => wallet_id, + None => continue, + }; + let participations_result = + kb_lib::list_wallet_participations_by_wallet_id(database.as_ref(), wallet_id).await; + let participations = match participations_result { + Ok(participations) => participations, + Err(error) => { + return Err(format!( + "cannot list wallet participations for wallet_id '{}': {}", + wallet_id, error + )); + } + }; + let mut filtered_participations = std::vec::Vec::new(); + for participation in participations { + let mut include_participation = false; + + if let Some(participation_pair_id) = participation.pair_id { + if participation_pair_id == pair_id { + include_participation = true; + } + } + if !include_participation { + if let Some(participation_pool_id) = participation.pool_id { + if participation_pool_id == inspected_pair.pool_id { + include_participation = true; + } + } + } + if include_participation { + filtered_participations.push(participation); + } + } + let holdings_result = + kb_lib::list_wallet_holdings_by_wallet_id(database.as_ref(), wallet_id).await; + let holdings = match holdings_result { + Ok(holdings) => holdings, + Err(error) => { + return Err(format!( + "cannot list wallet holdings for wallet_id '{}': {}", + wallet_id, error + )); + } + }; + let mut filtered_holdings = std::vec::Vec::new(); + for holding in holdings { + let mut include_holding = false; + if let Some(last_pair_id) = holding.last_pair_id { + if last_pair_id == pair_id { + include_holding = true; + } + } + if !include_holding { + if let Some(last_pool_id) = holding.last_pool_id { + if last_pool_id == inspected_pair.pool_id { + include_holding = true; + } + } + } + if include_holding { + filtered_holdings.push(holding); + } + } + if !filtered_participations.is_empty() || !filtered_holdings.is_empty() { + let wallet_value_result = serde_json::to_value(&wallet); + let wallet_value = match wallet_value_result { + Ok(wallet_value) => wallet_value, + Err(error) => { + return Err(format!( + "cannot serialize wallet '{}' to JSON value: {}", + wallet.address, error + )); + } + }; + let participations_value_result = serde_json::to_value(&filtered_participations); + let participations_value = match participations_value_result { + Ok(participations_value) => participations_value, + Err(error) => { + return Err(format!( + "cannot serialize wallet participations for '{}' to JSON value: {}", + wallet.address, error + )); + } + }; + let holdings_value_result = serde_json::to_value(&filtered_holdings); + let holdings_value = match holdings_value_result { + Ok(holdings_value) => holdings_value, + Err(error) => { + return Err(format!( + "cannot serialize wallet holdings for '{}' to JSON value: {}", + wallet.address, error + )); + } + }; + wallet_contexts.push(serde_json::json!({ + "walletAddress": wallet.address, + "wallet": wallet_value, + "participations": participations_value, + "holdings": holdings_value + })); + } + } + let mut pair_metrics = std::vec::Vec::::new(); + if let Some(pair_metric) = pair_metric_option { + pair_metrics.push(pair_metric); + } + let pair_signals_value_result = serde_json::to_value(&pair_signals); + let pair_signals_value = match pair_signals_value_result { + Ok(pair_signals_value) => pair_signals_value, + Err(error) => { + return Err(format!( + "cannot serialize pair analytic signals for pair_id '{}': {}", + pair_id, error + )); + } + }; + let pair_signal_groups = vec![serde_json::json!({ + "pairId": pair_id, + "signals": pair_signals_value + })]; + let summary_value = serde_json::json!({ + "mode": "pairOrPool", + "databaseUrl": database_url, + "objectKey": object_key, + "pairId": pair_id, + "poolId": inspected_pair.pool_id, + "poolAddress": inspected_pool.address, + "customTimeframeSeconds": custom_timeframe_seconds, + "poolListingCount": pool_listings.len(), + "launchAttributionCount": launch_attributions.len(), + "hasPoolOrigin": pool_origin_option.is_some(), + "walletContextCount": wallet_contexts.len(), + "tradeEventCount": trade_events.len(), + "pairMetricCount": pair_metrics.len(), + "pairCandleGroupCount": pair_candle_groups.len(), + "pairAnalyticSignalCount": pair_signals.len() + }); + let summary_json_result = kb_to_pretty_json(&summary_value, "summary"); + let summary_json = match summary_json_result { + Ok(summary_json) => summary_json, + Err(error) => return Err(error), + }; + let transaction_json_result = kb_to_pretty_json(&inspected_pair, "pair"); + let transaction_json = match transaction_json_result { + Ok(transaction_json) => transaction_json, + Err(error) => return Err(error), + }; + let decoded_events_json_result = kb_to_pretty_json(&pool_listings, "pool listings"); + let decoded_events_json = match decoded_events_json_result { + Ok(decoded_events_json) => decoded_events_json, + Err(error) => return Err(error), + }; + let pools_json_result = kb_to_pretty_json(&vec![inspected_pool], "pool"); + let pools_json = match pools_json_result { + Ok(pools_json) => pools_json, + Err(error) => return Err(error), + }; + let pairs_json_result = kb_to_pretty_json(&vec![inspected_pair], "pair"); + let pairs_json = match pairs_json_result { + Ok(pairs_json) => pairs_json, + Err(error) => return Err(error), + }; + let launch_attributions_json_result = + kb_to_pretty_json(&launch_attributions, "launch attributions"); + let launch_attributions_json = match launch_attributions_json_result { + Ok(launch_attributions_json) => launch_attributions_json, + Err(error) => return Err(error), + }; + let pool_origins_json_result = kb_to_pretty_json(&pool_origin_option, "pool origin"); + let pool_origins_json = match pool_origins_json_result { + Ok(pool_origins_json) => pool_origins_json, + Err(error) => return Err(error), + }; + let wallets_json_result = kb_to_pretty_json(&wallet_contexts, "wallet contexts"); + let wallets_json = match wallets_json_result { + Ok(wallets_json) => wallets_json, + Err(error) => return Err(error), + }; + let trade_events_json_result = kb_to_pretty_json(&trade_events, "trade events"); + let trade_events_json = match trade_events_json_result { + Ok(trade_events_json) => trade_events_json, + Err(error) => return Err(error), + }; + let pair_metrics_json_result = kb_to_pretty_json(&pair_metrics, "pair metrics"); + let pair_metrics_json = match pair_metrics_json_result { + Ok(pair_metrics_json) => pair_metrics_json, + Err(error) => return Err(error), + }; + let pair_candles_json_result = kb_to_pretty_json(&pair_candle_groups, "pair candles"); + let pair_candles_json = match pair_candles_json_result { + Ok(pair_candles_json) => pair_candles_json, + Err(error) => return Err(error), + }; + let pair_analytic_signals_json_result = + kb_to_pretty_json(&pair_signal_groups, "pair analytic signals"); + let pair_analytic_signals_json = match pair_analytic_signals_json_result { + Ok(pair_analytic_signals_json) => pair_analytic_signals_json, + Err(error) => return Err(error), + }; + Ok(KbDemoPipelineInspectPayload { + signature: object_key, + summary_json, + transaction_json, + decoded_events_json, + pools_json, + pairs_json, + launch_attributions_json, + pool_origins_json, + wallets_json, + trade_events_json, + pair_metrics_json, + pair_candles_json, + pair_analytic_signals_json, + }) +} + +fn kb_to_pretty_json( + value: &T, + label: &str, +) -> Result { + let json_result = serde_json::to_string_pretty(value); + match json_result { + Ok(json) => Ok(json), + Err(error) => Err(format!("cannot serialize {} as JSON: {}", label, error)), + } +} + +fn kb_extract_wallet_addresses_from_value( + value: &serde_json::Value, +) -> std::collections::BTreeSet { + let mut addresses = std::collections::BTreeSet::::new(); + let candidate_keys = ["creator", "poolCreator", "payer", "funder", "owner", "user"]; + kb_extract_wallet_addresses_from_value_inner(value, &candidate_keys, &mut addresses); + addresses +} + +fn kb_extract_wallet_addresses_from_value_inner( + value: &serde_json::Value, + candidate_keys: &[&str], + addresses: &mut std::collections::BTreeSet, +) { + if let Some(object) = value.as_object() { + for candidate_key in candidate_keys { + let direct_option = object.get(*candidate_key); + if let Some(direct) = direct_option { + let text_option = direct.as_str(); + if let Some(text) = text_option { + if !text.is_empty() { + addresses.insert(text.to_string()); + } + } + } + } + for nested_value in object.values() { + kb_extract_wallet_addresses_from_value_inner(nested_value, candidate_keys, addresses); + } + return; + } + if let Some(array) = value.as_array() { + for nested_value in array { + kb_extract_wallet_addresses_from_value_inner(nested_value, candidate_keys, addresses); + } + } +} diff --git a/kb_app/src/lib.rs b/kb_app/src/lib.rs index cc83ad9..c02d4bb 100644 --- a/kb_app/src/lib.rs +++ b/kb_app/src/lib.rs @@ -10,6 +10,7 @@ #![warn(missing_docs)] mod demo_http; +mod demo_pipeline; mod demo_ws; mod demo_ws_manager; mod splash; @@ -36,16 +37,18 @@ impl KbWsRuntimeState { /// Shared application state stored inside Tauri. struct KbAppState { config: kb_lib::KbConfig, + database: std::sync::Arc, ws_runtime: tokio::sync::Mutex, demo_ws_runtime: std::sync::Arc>, - demo_ws_manager_runtime: std::sync::Arc>, + demo_ws_manager_runtime: + std::sync::Arc>, ws_manager: std::sync::Arc, http_pool: kb_lib::HttpEndpointPool, } /// Runs the desktop application. #[cfg_attr(mobile, tauri::mobile_entry_point)] -pub fn run() { +pub async fn run() -> Result<(), kb_lib::KbError> { let config_path = kb_lib::KbConfig::default_path(); let config_result = kb_lib::KbConfig::load_from_path(&config_path); let config = match config_result { @@ -56,20 +59,20 @@ pub fn run() { config_path.display(), error ); - return; + return Err(error); } }; let prepare_result = config.prepare_filesystem(); if let Err(error) = prepare_result { eprintln!("kb_app filesystem preparation error: {error}"); - return; + return Err(error); } let tracing_guard_result = kb_lib::init_tracing(&config.logging); let _tracing_guard = match tracing_guard_result { Ok(guard) => guard, Err(error) => { eprintln!("kb_app tracing initialization error: {error}"); - return; + return Err(error); } }; tracing::info!( @@ -77,6 +80,11 @@ pub fn run() { environment = %config.app.environment, "starting desktop application" ); + let database_result = kb_lib::KbDatabase::connect_and_initialize(&config.database).await; + let database = match database_result { + Ok(database) => database, + Err(error) => return Err(error), + }; let http_pool_result = kb_lib::HttpEndpointPool::from_config(&config); let http_pool = match http_pool_result { Ok(http_pool) => http_pool, @@ -95,6 +103,7 @@ pub fn run() { }; let app_state = KbAppState { config: config.clone(), + database: std::sync::Arc::new(database), ws_runtime: tokio::sync::Mutex::new(KbWsRuntimeState::new()), demo_ws_runtime: std::sync::Arc::new(tokio::sync::Mutex::new( crate::demo_ws::KbDemoWsRuntimeState::new(), @@ -128,6 +137,11 @@ pub fn run() { crate::demo_ws_manager::demo_ws_manager_stop_all, crate::demo_ws_manager::demo_ws_manager_start_role, crate::demo_ws_manager::demo_ws_manager_stop_role, + crate::demo_pipeline::open_demo_pipeline_window, + crate::demo_pipeline::demo_pipeline_inspect_signature, + crate::demo_pipeline::demo_pipeline_inspect_token_mint, + crate::demo_pipeline::demo_pipeline_inspect_pair_id, + crate::demo_pipeline::demo_pipeline_inspect_pool_address, ]); tauri_builder = tauri_builder.plugin(tracing_builder.build::()); tauri_builder = tauri_builder.setup(|app| { @@ -202,7 +216,11 @@ pub fn run() { let run_result = tauri_builder.run(tauri::generate_context!()); if let Err(error) = run_result { tracing::error!("error while running tauri application: {error:?}"); + return Err(kb_lib::KbError::InvalidState(format!( + "error while running tauri application: {error:?}" + ))); } + Ok(()) } fn emit_splash_order( diff --git a/kb_app/src/main.rs b/kb_app/src/main.rs index 312fa62..1693038 100644 --- a/kb_app/src/main.rs +++ b/kb_app/src/main.rs @@ -14,9 +14,7 @@ use fs2::FileExt; /// Entrypoint of the kb app binary. #[tokio::main] -async -fn main() -> std::process::ExitCode -{ +async fn main() -> std::process::ExitCode { let mut lock_path = std::env::temp_dir(); lock_path.push("com_khadhroony_solana_rust.lock"); let lock_file = match std::fs::File::create(lock_path) { @@ -24,7 +22,7 @@ fn main() -> std::process::ExitCode Err(_err) => { eprintln!("Cannot create lock!"); std::process::exit(1); - }, + } }; // trying to aquire an exclusive lock if lock_file.try_lock_exclusive().is_err() { @@ -34,13 +32,17 @@ fn main() -> std::process::ExitCode if rustls::crypto::CryptoProvider::get_default().is_none() { let provider_result = rustls::crypto::aws_lc_rs::default_provider().install_default(); match provider_result { - Ok(()) => {}, + Ok(()) => {} Err(error) => { eprintln!("kb_app rustls provider init error: {:?}", error); return std::process::ExitCode::FAILURE; - }, + } } - } - kb_app_lib::run(); + } + let run_result = kb_app_lib::run().await; + if let Err(error) = run_result { + eprintln!("application error: {}", error); + std::process::exit(1); + } std::process::ExitCode::SUCCESS } diff --git a/kb_app/tauri.conf.json b/kb_app/tauri.conf.json index 833d2de..6209cd2 100644 --- a/kb_app/tauri.conf.json +++ b/kb_app/tauri.conf.json @@ -1,7 +1,7 @@ { "$schema": "https://schema.tauri.app/config/2", "productName": "kb-bapp", - "version": "0.6.6", + "version": "0.7.22", "identifier": "com.sasedev.kb-app", "build": { "beforeDevCommand": "npm run dev", @@ -78,6 +78,20 @@ "create": false, "transparent": false, "decorations": true + }, + { + "label": "demo_pipeline", + "url": "demo_pipeline.html", + "title": "Demo Pipeline", + "width": 1480, + "height": 920, + "minWidth": 1000, + "minHeight": 700, + "center": true, + "visible": false, + "create": false, + "transparent": false, + "decorations": true } ], "security": { diff --git a/kb_lib/src/config.rs b/kb_lib/src/config.rs index e73d485..c6678ff 100644 --- a/kb_lib/src/config.rs +++ b/kb_lib/src/config.rs @@ -152,7 +152,7 @@ impl KbConfig { wallets_directory.display() ))); } - let sqlite_path = self.data.sqlite_path_buf(); + let sqlite_path = self.database.sqlite.path_buf(); let sqlite_parent_option = sqlite_path.parent(); if let Some(sqlite_parent) = sqlite_parent_option { if !sqlite_parent.as_os_str().is_empty() { @@ -509,6 +509,13 @@ pub struct KbSqliteDatabaseConfig { pub use_wal: bool, } +impl KbSqliteDatabaseConfig { + /// Returns the resolved SQLite database path. + pub fn path_buf(&self) -> std::path::PathBuf { + kb_resolve_workspace_relative_path(&self.path) + } +} + /// Database configuration. #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/kb_lib/src/db/sqlite.rs b/kb_lib/src/db/sqlite.rs index 7c43d1e..0bac4c6 100644 --- a/kb_lib/src/db/sqlite.rs +++ b/kb_lib/src/db/sqlite.rs @@ -12,7 +12,8 @@ pub(crate) fn sqlite_database_url_from_config( "database.sqlite.path must not be empty".to_string(), )); } - Ok(format!("sqlite://{}", path)) + let database_path = config.sqlite.path_buf(); + Ok(format!("sqlite://{}", database_path.display())) } /// Opens a SQLite pool according to configuration. @@ -30,7 +31,7 @@ pub(crate) async fn connect_sqlite( "database.sqlite.max_connections must be > 0".to_string(), )); } - let database_path = std::path::Path::new(path); + let database_path = config.sqlite.path_buf(); let parent_option = database_path.parent(); if let Some(parent) = parent_option { if !parent.as_os_str().is_empty() { @@ -45,7 +46,7 @@ pub(crate) async fn connect_sqlite( } } let mut connect_options = sqlx::sqlite::SqliteConnectOptions::new() - .filename(database_path) + .filename(&database_path) .create_if_missing(config.sqlite.create_if_missing) .foreign_keys(true) .busy_timeout(std::time::Duration::from_millis( @@ -61,7 +62,7 @@ pub(crate) async fn connect_sqlite( Ok(pool) => Ok(pool), Err(error) => Err(crate::KbError::Db(format!( "cannot open sqlite database '{}': {}", - path, error + database_path.display(), error ))), } }