diff --git a/CHANGELOG.md b/CHANGELOG.md
index b71e049..6a9b263 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -54,3 +54,4 @@
0.7.21 - Ajout d’une première couche de signaux analytiques enrichis par paire avec persistance dédiée et détection de first trade, trade burst, buy/sell imbalance, price jump et volume spike
0.7.22 - Ajout d’une première fenêtre `Demo Pipeline` dans `kb_app` pour l’inspection en lecture seule du pipeline `0.7.x`, avec recherche par signature, token mint, pair id ou pool address, affichage structuré des transactions résolues, événements DEX décodés, pools, paires, listings, launch origins, pool origins, wallets et holdings observés, trade events, pair metrics, candles et signaux analytiques déjà persistés, ainsi que conservation d’une instance partagée de la base SQLite pour éviter la réouverture et la réinitialisation du schéma à chaque commande UI
0.7.23 - Ajout du pilotage UI du backfill historique ciblé par `token mint` dans `kb_app`, avec saisie du rôle HTTP et des limites de signatures, affichage du résumé de backfill, réinspection automatique du token dans `Demo Pipeline` lorsque des objets persistés sont effectivement reconstruits, et gestion explicite du cas où le backfill réussit sans matérialiser de token exploitable dans la base locale
+0.7.24 - Ajout de l’affichage graphique des candles / OHLCV dans `kb_app` via `echarts`, avec sélection de paire et de timeframe, rendu chandelier + volume, et prise en charge des candles matérialisées ou régénérées à la demande depuis `Demo Pipeline`
diff --git a/Cargo.toml b/Cargo.toml
index 1d5993a..a8e2b5e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,7 +8,7 @@ members = [
]
[workspace.package]
-version = "0.7.23"
+version = "0.7.24"
edition = "2024"
license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot"
diff --git a/ROADMAP.md b/ROADMAP.md
index 6b89a9d..c38f0ac 100644
--- a/ROADMAP.md
+++ b/ROADMAP.md
@@ -686,16 +686,14 @@ Réalisé :
- gestion explicite du cas où le backfill réussit sans matérialiser de token exploitable dans la base locale.
### 6.056. Version `0.7.24` — `kb_app` : visualisation candles / OHLCV
-Objectif : fournir une vue graphique exploitable des candles via `echarts`.
+Réalisé :
-À faire :
-
-- ajouter une vue de sélection de paire,
-- permettre le choix du timeframe,
-- lire les candles matérialisées pour les timeframes usuels,
-- permettre la régénération à la demande pour un timeframe arbitraire,
-- afficher les chandeliers, les volumes et la navigation temporelle,
-- préparer l’affichage d’overlays analytiques.
+- ajout d’un affichage graphique des candles / OHLCV dans `kb_app` via `echarts`,
+- sélection dynamique de la paire inspectée,
+- sélection dynamique du timeframe disponible,
+- affichage conjoint des chandeliers OHLC et du volume,
+- prise en charge des candles matérialisées et des candles régénérées à la demande pour un timeframe custom,
+- intégration du rendu graphique directement dans `Demo Pipeline`.
### 6.057. Version `0.7.25` — `kb_app` : overlays analytiques
Objectif : rendre visibles les signaux analytiques directement sur les graphes et vues de marché.
diff --git a/kb_app/capabilities/default.json b/kb_app/capabilities/default.json
index 2e2236c..bce06c4 100644
--- a/kb_app/capabilities/default.json
+++ b/kb_app/capabilities/default.json
@@ -8,7 +8,8 @@
"demo_ws",
"demo_http",
"demo_ws_manager",
- "demo_pipeline"
+ "demo_pipeline",
+ "demo_pipeline2"
],
"permissions": [
"core:default",
diff --git a/kb_app/frontend/demo_pipeline.html b/kb_app/frontend/demo_pipeline.html
index df9ce4c..e022b06 100644
--- a/kb_app/frontend/demo_pipeline.html
+++ b/kb_app/frontend/demo_pipeline.html
@@ -71,19 +71,12 @@
Inspecter token
-
+
Pair id
-
+
+
+ Pool address à backfill
+
+
+
+
+ Pool signatures
+
+
+
+
+
+ Backfill pool
+
+
+
@@ -184,7 +160,7 @@
-
+
Dernier backfill token
@@ -192,6 +168,36 @@
+
+
+
+
+
Candles / OHLCV
+
+ Aucun jeu de candles chargé.
+
+
+
+
+
+ Pair
+
+ Aucune
+
+
+
+ Timeframe
+
+ Aucun
+
+
+
+
+
+
+
+
+
@@ -78,6 +81,9 @@
Ouvrir Demo Pipeline
+
+ Ouvrir Demo Pipeline 2
+
diff --git a/kb_app/frontend/ts/bindings/KbDemoPipeline2BackfillPayload.ts b/kb_app/frontend/ts/bindings/KbDemoPipeline2BackfillPayload.ts
new file mode 100644
index 0000000..996ccd1
--- /dev/null
+++ b/kb_app/frontend/ts/bindings/KbDemoPipeline2BackfillPayload.ts
@@ -0,0 +1,27 @@
+// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
+import type { KbDemoPipeline2CatalogPayload } from "./KbDemoPipeline2CatalogPayload";
+
+/**
+ * Shared backfill response payload.
+ */
+export type KbDemoPipeline2BackfillPayload = {
+/**
+ * Object key used by the backfill.
+ */
+objectKey: string,
+/**
+ * Mode: `tokenMint` or `poolAddress`.
+ */
+mode: string,
+/**
+ * HTTP role used.
+ */
+httpRole: string,
+/**
+ * Pretty JSON summary.
+ */
+summaryJson: string,
+/**
+ * Refreshed local catalog after backfill.
+ */
+catalog: KbDemoPipeline2CatalogPayload, };
diff --git a/kb_app/frontend/ts/bindings/KbDemoPipeline2BackfillPoolRequest.ts b/kb_app/frontend/ts/bindings/KbDemoPipeline2BackfillPoolRequest.ts
new file mode 100644
index 0000000..4b564de
--- /dev/null
+++ b/kb_app/frontend/ts/bindings/KbDemoPipeline2BackfillPoolRequest.ts
@@ -0,0 +1,18 @@
+// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
+
+/**
+ * Request payload for pool backfill.
+ */
+export type KbDemoPipeline2BackfillPoolRequest = {
+/**
+ * Pool address to backfill.
+ */
+poolAddress: string,
+/**
+ * Optional HTTP role.
+ */
+httpRole: string | null,
+/**
+ * Limit for signatures fetched from the pool.
+ */
+poolSignatureLimit: number, };
diff --git a/kb_app/frontend/ts/bindings/KbDemoPipeline2BackfillTokenRequest.ts b/kb_app/frontend/ts/bindings/KbDemoPipeline2BackfillTokenRequest.ts
new file mode 100644
index 0000000..49954ce
--- /dev/null
+++ b/kb_app/frontend/ts/bindings/KbDemoPipeline2BackfillTokenRequest.ts
@@ -0,0 +1,22 @@
+// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
+
+/**
+ * Request payload for token backfill.
+ */
+export type KbDemoPipeline2BackfillTokenRequest = {
+/**
+ * Token mint to backfill.
+ */
+tokenMint: string,
+/**
+ * Optional HTTP role.
+ */
+httpRole: string | null,
+/**
+ * Limit for signatures fetched from the mint.
+ */
+mintSignatureLimit: number,
+/**
+ * Limit for signatures fetched from each discovered pool.
+ */
+poolSignatureLimit: number, };
diff --git a/kb_app/frontend/ts/bindings/KbDemoPipeline2CatalogPayload.ts b/kb_app/frontend/ts/bindings/KbDemoPipeline2CatalogPayload.ts
new file mode 100644
index 0000000..6c42ec5
--- /dev/null
+++ b/kb_app/frontend/ts/bindings/KbDemoPipeline2CatalogPayload.ts
@@ -0,0 +1,25 @@
+// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
+import type { KbDemoPipeline2PairItem } from "./KbDemoPipeline2PairItem";
+import type { KbDemoPipeline2PoolItem } from "./KbDemoPipeline2PoolItem";
+import type { KbDemoPipeline2TokenItem } from "./KbDemoPipeline2TokenItem";
+
+/**
+ * Full local catalog payload.
+ */
+export type KbDemoPipeline2CatalogPayload = {
+/**
+ * Open database URL.
+ */
+databaseUrl: string,
+/**
+ * Observed token list.
+ */
+tokens: Array,
+/**
+ * Known pool list.
+ */
+pools: Array,
+/**
+ * Known pair list.
+ */
+pairs: Array, };
diff --git a/kb_app/frontend/ts/bindings/KbDemoPipeline2PairCandlesPayload.ts b/kb_app/frontend/ts/bindings/KbDemoPipeline2PairCandlesPayload.ts
new file mode 100644
index 0000000..dbb8fa8
--- /dev/null
+++ b/kb_app/frontend/ts/bindings/KbDemoPipeline2PairCandlesPayload.ts
@@ -0,0 +1,18 @@
+// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
+
+/**
+ * Candle payload returned to the UI.
+ */
+export type KbDemoPipeline2PairCandlesPayload = {
+/**
+ * Pair id.
+ */
+pairId: number,
+/**
+ * Timeframe in seconds.
+ */
+timeframeSeconds: number,
+/**
+ * Pretty JSON array of candles.
+ */
+candlesJson: string, };
diff --git a/kb_app/frontend/ts/bindings/KbDemoPipeline2PairCandlesRequest.ts b/kb_app/frontend/ts/bindings/KbDemoPipeline2PairCandlesRequest.ts
new file mode 100644
index 0000000..db8bb13
--- /dev/null
+++ b/kb_app/frontend/ts/bindings/KbDemoPipeline2PairCandlesRequest.ts
@@ -0,0 +1,18 @@
+// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
+
+/**
+ * Request payload for pair candles.
+ */
+export type KbDemoPipeline2PairCandlesRequest = {
+/**
+ * Pair id to load.
+ */
+pairId: number,
+/**
+ * Timeframe in seconds.
+ */
+timeframeSeconds: number,
+/**
+ * Whether materialized candles should be preferred when available.
+ */
+preferMaterialized: boolean, };
diff --git a/kb_app/frontend/ts/bindings/KbDemoPipeline2PairItem.ts b/kb_app/frontend/ts/bindings/KbDemoPipeline2PairItem.ts
new file mode 100644
index 0000000..efb3529
--- /dev/null
+++ b/kb_app/frontend/ts/bindings/KbDemoPipeline2PairItem.ts
@@ -0,0 +1,30 @@
+// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
+
+/**
+ * One pair item for the local catalog.
+ */
+export type KbDemoPipeline2PairItem = {
+/**
+ * Internal pair id.
+ */
+pairId: number,
+/**
+ * Related pool address.
+ */
+poolAddress: string,
+/**
+ * Optional pair symbol.
+ */
+symbol: string | null,
+/**
+ * Optional DEX code.
+ */
+dexCode: string | null,
+/**
+ * Optional local trade count.
+ */
+tradeCount: number | null,
+/**
+ * Optional local last price.
+ */
+lastPriceQuotePerBase: number | null, };
diff --git a/kb_app/frontend/ts/bindings/KbDemoPipeline2PoolItem.ts b/kb_app/frontend/ts/bindings/KbDemoPipeline2PoolItem.ts
new file mode 100644
index 0000000..6258727
--- /dev/null
+++ b/kb_app/frontend/ts/bindings/KbDemoPipeline2PoolItem.ts
@@ -0,0 +1,18 @@
+// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
+
+/**
+ * One pool item for the local catalog.
+ */
+export type KbDemoPipeline2PoolItem = {
+/**
+ * Pool address.
+ */
+poolAddress: string,
+/**
+ * Optional internal pair id when known.
+ */
+pairId: number | null,
+/**
+ * Optional DEX code.
+ */
+dexCode: string | null, };
diff --git a/kb_app/frontend/ts/bindings/KbDemoPipeline2TokenItem.ts b/kb_app/frontend/ts/bindings/KbDemoPipeline2TokenItem.ts
new file mode 100644
index 0000000..5b94a30
--- /dev/null
+++ b/kb_app/frontend/ts/bindings/KbDemoPipeline2TokenItem.ts
@@ -0,0 +1,18 @@
+// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
+
+/**
+ * One token item for the local catalog.
+ */
+export type KbDemoPipeline2TokenItem = {
+/**
+ * Token mint.
+ */
+mint: string,
+/**
+ * Optional token symbol.
+ */
+symbol: string | null,
+/**
+ * Optional token name.
+ */
+name: string | null, };
diff --git a/kb_app/frontend/ts/bindings/KbDemoPipelineBackfillPoolPayload.ts b/kb_app/frontend/ts/bindings/KbDemoPipelineBackfillPoolPayload.ts
new file mode 100644
index 0000000..e5f51aa
--- /dev/null
+++ b/kb_app/frontend/ts/bindings/KbDemoPipelineBackfillPoolPayload.ts
@@ -0,0 +1,22 @@
+// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
+
+/**
+ * Response payload for one pool backfill launched from `kb_app`.
+ */
+export type KbDemoPipelineBackfillPoolPayload = {
+/**
+ * Backfilled pool address.
+ */
+poolAddress: string,
+/**
+ * HTTP role used during backfill.
+ */
+httpRole: string,
+/**
+ * Pretty JSON summary returned by `KbTokenBackfillService::backfill_pool_by_address`.
+ */
+backfillJson: string,
+/**
+ * Whether the pool exists in persisted pool objects after backfill.
+ */
+poolPersistedAfterBackfill: boolean, };
diff --git a/kb_app/frontend/ts/bindings/KbDemoPipelineBackfillPoolRequest.ts b/kb_app/frontend/ts/bindings/KbDemoPipelineBackfillPoolRequest.ts
new file mode 100644
index 0000000..99e56bc
--- /dev/null
+++ b/kb_app/frontend/ts/bindings/KbDemoPipelineBackfillPoolRequest.ts
@@ -0,0 +1,18 @@
+// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
+
+/**
+ * Request payload for one pool backfill launched from `kb_app`.
+ */
+export type KbDemoPipelineBackfillPoolRequest = {
+/**
+ * Pool address to backfill.
+ */
+poolAddress: string,
+/**
+ * HTTP role used to select one endpoint in the pool.
+ */
+httpRole: string | null,
+/**
+ * Maximum number of signatures fetched from the pool address.
+ */
+poolSignatureLimit: number, };
diff --git a/kb_app/frontend/ts/demo_pipeline.ts b/kb_app/frontend/ts/demo_pipeline.ts
index 8a5e27b..b5f6a0e 100644
--- a/kb_app/frontend/ts/demo_pipeline.ts
+++ b/kb_app/frontend/ts/demo_pipeline.ts
@@ -12,10 +12,391 @@ import { KbDemoPipelineInspectPairRequest } from './bindings/KbDemoPipelineInspe
import { KbDemoPipelineInspectPoolRequest } from './bindings/KbDemoPipelineInspectPoolRequest.ts';
import { KbDemoPipelineBackfillTokenRequest } from './bindings/KbDemoPipelineBackfillTokenRequest.ts';
import { KbDemoPipelineBackfillTokenPayload } from './bindings/KbDemoPipelineBackfillTokenPayload.ts';
+import { KbDemoPipelineBackfillPoolRequest } from './bindings/KbDemoPipelineBackfillPoolRequest.ts';
+import { KbDemoPipelineBackfillPoolPayload } from './bindings/KbDemoPipelineBackfillPoolPayload.ts';
+import * as echarts from "echarts";
(window as Window & typeof globalThis & { bootstrap?: typeof bootstrap }).bootstrap = bootstrap;
(window as Window & typeof globalThis & { ResizeObserver?: typeof ResizeObserver }).ResizeObserver = ResizeObserver;
+interface DemoPipelinePairCandle {
+ id: number | null;
+ pair_id: number;
+ timeframe_seconds: number;
+ bucket_start_unix: number;
+ bucket_end_unix: number;
+ open_price_quote_per_base: number;
+ high_price_quote_per_base: number;
+ low_price_quote_per_base: number;
+ close_price_quote_per_base: number;
+ trade_count: number;
+ buy_count: number;
+ sell_count: number;
+ base_volume_raw: string | null;
+ quote_volume_raw: string | null;
+ first_trade_signature: string | null;
+ last_trade_signature: string | null;
+ created_at: string;
+ updated_at: string;
+}
+
+interface DemoPipelinePairCandleGroup {
+ pairId: number;
+ timeframeSeconds: number;
+ candles: DemoPipelinePairCandle[];
+}
+
+function parsePairCandleGroups(rawJson: string): DemoPipelinePairCandleGroup[] {
+ if (rawJson.trim() === "") {
+ return [];
+ }
+
+ try {
+ const parsed = JSON.parse(rawJson) as unknown;
+ if (!Array.isArray(parsed)) {
+ return [];
+ }
+
+ const groups: DemoPipelinePairCandleGroup[] = [];
+
+ for (const value of parsed) {
+ if (typeof value !== "object" || value === null) {
+ continue;
+ }
+
+ const maybeGroup = value as {
+ pairId?: unknown;
+ timeframeSeconds?: unknown;
+ candles?: unknown;
+ };
+
+ if (
+ typeof maybeGroup.pairId !== "number" ||
+ typeof maybeGroup.timeframeSeconds !== "number" ||
+ !Array.isArray(maybeGroup.candles)
+ ) {
+ continue;
+ }
+
+ groups.push({
+ pairId: maybeGroup.pairId,
+ timeframeSeconds: maybeGroup.timeframeSeconds,
+ candles: maybeGroup.candles as DemoPipelinePairCandle[],
+ });
+ }
+
+ return groups;
+ } catch {
+ return [];
+ }
+}
+
+function formatTimeframeLabel(timeframeSeconds: number): string {
+ if (timeframeSeconds % 3600 === 0) {
+ return `${timeframeSeconds / 3600}h`;
+ }
+ if (timeframeSeconds % 60 === 0) {
+ return `${timeframeSeconds / 60}m`;
+ }
+ return `${timeframeSeconds}s`;
+}
+
+function parseRawVolume(text: string | null, fallback: number): number {
+ if (text === null || text.trim() === "") {
+ return fallback;
+ }
+
+ const parsed = Number.parseFloat(text);
+ if (Number.isNaN(parsed)) {
+ return fallback;
+ }
+
+ return parsed;
+}
+
+function setEmptyCandlesChart(
+ chart: echarts.ECharts,
+ chartMeta: HTMLElement,
+ message: string,
+): void {
+ chartMeta.textContent = message;
+
+ chart.setOption({
+ animation: false,
+ title: {
+ text: message,
+ left: "center",
+ top: "middle",
+ textStyle: {
+ fontSize: 14,
+ fontWeight: "normal",
+ },
+ },
+ tooltip: {},
+ xAxis: { show: false, type: "category", data: [] },
+ yAxis: { show: false, type: "value" },
+ series: [],
+ }, true);
+}
+
+function refreshCandlesSelectors(
+ groups: DemoPipelinePairCandleGroup[],
+ pairSelect: HTMLSelectElement,
+ timeframeSelect: HTMLSelectElement,
+): void {
+ const currentPairValue = pairSelect.value;
+ const currentTimeframeValue = timeframeSelect.value;
+
+ const uniquePairs = Array.from(new Set(groups.map((group) => group.pairId))).sort((left, right) => left - right);
+
+ pairSelect.innerHTML = "";
+ if (uniquePairs.length === 0) {
+ const option = document.createElement("option");
+ option.value = "";
+ option.textContent = "Aucune";
+ pairSelect.appendChild(option);
+
+ timeframeSelect.innerHTML = "";
+ const tfOption = document.createElement("option");
+ tfOption.value = "";
+ tfOption.textContent = "Aucun";
+ timeframeSelect.appendChild(tfOption);
+ return;
+ }
+
+ for (const pairId of uniquePairs) {
+ const option = document.createElement("option");
+ option.value = String(pairId);
+ option.textContent = `Pair #${pairId}`;
+ if (option.value === currentPairValue) {
+ option.selected = true;
+ }
+ pairSelect.appendChild(option);
+ }
+
+ if (pairSelect.value === "" && uniquePairs.length > 0) {
+ pairSelect.value = String(uniquePairs[0]);
+ }
+
+ const selectedPairId = Number.parseInt(pairSelect.value, 10);
+ const pairGroups = groups
+ .filter((group) => group.pairId === selectedPairId)
+ .sort((left, right) => left.timeframeSeconds - right.timeframeSeconds);
+
+ timeframeSelect.innerHTML = "";
+
+ if (pairGroups.length === 0) {
+ const option = document.createElement("option");
+ option.value = "";
+ option.textContent = "Aucun";
+ timeframeSelect.appendChild(option);
+ return;
+ }
+
+ for (const group of pairGroups) {
+ const option = document.createElement("option");
+ option.value = String(group.timeframeSeconds);
+ option.textContent = formatTimeframeLabel(group.timeframeSeconds);
+ if (option.value === currentTimeframeValue) {
+ option.selected = true;
+ }
+ timeframeSelect.appendChild(option);
+ }
+
+ if (timeframeSelect.value === "" && pairGroups.length > 0) {
+ timeframeSelect.value = String(pairGroups[0].timeframeSeconds);
+ }
+}
+
+function renderSelectedCandlesChart(
+ chart: echarts.ECharts,
+ chartMeta: HTMLElement,
+ groups: DemoPipelinePairCandleGroup[],
+ pairSelect: HTMLSelectElement,
+ timeframeSelect: HTMLSelectElement,
+): void {
+ if (groups.length === 0) {
+ setEmptyCandlesChart(chart, chartMeta, "Aucune candle disponible.");
+ return;
+ }
+
+ const selectedPairId = Number.parseInt(pairSelect.value, 10);
+ const selectedTimeframe = Number.parseInt(timeframeSelect.value, 10);
+
+ if (Number.isNaN(selectedPairId) || Number.isNaN(selectedTimeframe)) {
+ setEmptyCandlesChart(chart, chartMeta, "Sélection de paire/timeframe invalide.");
+ return;
+ }
+
+ const group = groups.find(
+ (value) =>
+ value.pairId === selectedPairId &&
+ value.timeframeSeconds === selectedTimeframe,
+ );
+
+ if (!group || group.candles.length === 0) {
+ setEmptyCandlesChart(
+ chart,
+ chartMeta,
+ `Aucune candle pour la pair #${selectedPairId} en ${formatTimeframeLabel(selectedTimeframe)}.`,
+ );
+ return;
+ }
+
+ const candles = [...group.candles].sort(
+ (left, right) => left.bucket_start_unix - right.bucket_start_unix,
+ );
+
+ const categoryData = candles.map((candle) =>
+ new Date(candle.bucket_start_unix * 1000).toLocaleString("fr-CH", {
+ hour12: false,
+ year: "2-digit",
+ month: "2-digit",
+ day: "2-digit",
+ hour: "2-digit",
+ minute: "2-digit",
+ }),
+ );
+
+ const ohlcData = candles.map((candle) => [
+ candle.open_price_quote_per_base,
+ candle.close_price_quote_per_base,
+ candle.low_price_quote_per_base,
+ candle.high_price_quote_per_base,
+ ]);
+
+ const volumeData = candles.map((candle) =>
+ parseRawVolume(candle.quote_volume_raw, candle.trade_count),
+ );
+
+ chartMeta.textContent =
+ `Pair #${selectedPairId} • ${formatTimeframeLabel(selectedTimeframe)} • ${candles.length} candles`;
+
+ chart.setOption(
+ {
+ animation: false,
+ legend: {
+ data: ["OHLC", "Volume"],
+ top: 0,
+ },
+ tooltip: {
+ trigger: "axis",
+ axisPointer: {
+ type: "cross",
+ },
+ },
+ axisPointer: {
+ link: [{ xAxisIndex: "all" }],
+ },
+ grid: [
+ { left: 60, right: 24, top: 40, height: "58%" },
+ { left: 60, right: 24, top: "74%", height: "16%" },
+ ],
+ xAxis: [
+ {
+ type: "category",
+ data: categoryData,
+ boundaryGap: true,
+ axisLine: { onZero: false },
+ splitLine: { show: false },
+ min: "dataMin",
+ max: "dataMax",
+ },
+ {
+ type: "category",
+ gridIndex: 1,
+ data: categoryData,
+ boundaryGap: true,
+ axisLine: { onZero: false },
+ axisTick: { show: false },
+ splitLine: { show: false },
+ axisLabel: { show: false },
+ min: "dataMin",
+ max: "dataMax",
+ },
+ ],
+ yAxis: [
+ {
+ scale: true,
+ splitArea: { show: false },
+ },
+ {
+ gridIndex: 1,
+ scale: true,
+ splitNumber: 2,
+ },
+ ],
+ dataZoom: [
+ {
+ type: "inside",
+ xAxisIndex: [0, 1],
+ start: 0,
+ end: 100,
+ },
+ {
+ show: true,
+ type: "slider",
+ xAxisIndex: [0, 1],
+ bottom: 6,
+ start: 0,
+ end: 100,
+ },
+ ],
+ series: [
+ {
+ name: "OHLC",
+ type: "candlestick",
+ data: ohlcData,
+ },
+ {
+ name: "Volume",
+ type: "bar",
+ xAxisIndex: 1,
+ yAxisIndex: 1,
+ data: volumeData,
+ },
+ ],
+ },
+ true,
+ );
+}
+
+function applyInspectionPayload(
+ payload: KbDemoPipelineInspectPayload,
+ summaryTextarea: HTMLTextAreaElement,
+ transactionTextarea: HTMLTextAreaElement,
+ decodedEventsTextarea: HTMLTextAreaElement,
+ poolsTextarea: HTMLTextAreaElement,
+ pairsTextarea: HTMLTextAreaElement,
+ launchAttributionsTextarea: HTMLTextAreaElement,
+ poolOriginsTextarea: HTMLTextAreaElement,
+ walletsTextarea: HTMLTextAreaElement,
+ tradeEventsTextarea: HTMLTextAreaElement,
+ pairMetricsTextarea: HTMLTextAreaElement,
+ pairCandlesTextarea: HTMLTextAreaElement,
+ pairAnalyticSignalsTextarea: HTMLTextAreaElement,
+ chart: echarts.ECharts,
+ chartMeta: HTMLElement,
+ pairSelect: HTMLSelectElement,
+ timeframeSelect: HTMLSelectElement,
+): void {
+ summaryTextarea.value = payload.summaryJson;
+ transactionTextarea.value = payload.transactionJson;
+ decodedEventsTextarea.value = payload.decodedEventsJson;
+ poolsTextarea.value = payload.poolsJson;
+ pairsTextarea.value = payload.pairsJson;
+ launchAttributionsTextarea.value = payload.launchAttributionsJson;
+ poolOriginsTextarea.value = payload.poolOriginsJson;
+ walletsTextarea.value = payload.walletsJson;
+ tradeEventsTextarea.value = payload.tradeEventsJson;
+ pairMetricsTextarea.value = payload.pairMetricsJson;
+ pairCandlesTextarea.value = payload.pairCandlesJson;
+ pairAnalyticSignalsTextarea.value = payload.pairAnalyticSignalsJson;
+
+ const groups = parsePairCandleGroups(payload.pairCandlesJson);
+ refreshCandlesSelectors(groups, pairSelect, timeframeSelect);
+ renderSelectedCandlesChart(chart, chartMeta, groups, pairSelect, timeframeSelect);
+}
function appendLogLine(textarea: HTMLTextAreaElement, line: string): void {
const now = new Date();
@@ -167,14 +548,30 @@ document.addEventListener("DOMContentLoaded", async () => {
const backfillTokenButton = document.querySelector("#demoPipelineBackfillTokenButton");
const backfillTextarea = document.querySelector("#demoPipelineBackfillTextarea");
+ const chartPairSelect = document.querySelector("#demoPipelineChartPairSelect");
+ const chartTimeframeSelect = document.querySelector("#demoPipelineChartTimeframeSelect");
+ const candlesChartElement = document.querySelector("#demoPipelineCandlesChart");
+ const candlesChartMeta = document.querySelector("#demoPipelineCandlesChartMeta");
+
+ const backfillPoolAddressInput = document.querySelector("#demoPipelineBackfillPoolAddressInput");
+ const backfillPoolOnlyLimitInput = document.querySelector("#demoPipelineBackfillPoolOnlyLimitInput");
+ const backfillPoolButton = document.querySelector("#demoPipelineBackfillPoolButton");
+
if (
+ !chartPairSelect ||
+ !chartTimeframeSelect ||
+ !candlesChartElement ||
+ !candlesChartMeta ||
!backfillTokenMintInput ||
!backfillHttpRoleInput ||
!backfillMintLimitInput ||
!backfillPoolLimitInput ||
!backfillTokenButton ||
!backfillTextarea ||
+ !backfillPoolAddressInput ||
+ !backfillPoolOnlyLimitInput ||
+ !backfillPoolButton ||
!pairIdInput ||
!inspectPairButton ||
!poolAddressInput ||
@@ -204,6 +601,36 @@ document.addEventListener("DOMContentLoaded", async () => {
return;
}
+ const candlesChart = echarts.init(candlesChartElement);
+ setEmptyCandlesChart(candlesChart, candlesChartMeta, "Aucune candle disponible.");
+
+ window.addEventListener("resize", () => {
+ candlesChart.resize();
+ });
+
+ chartPairSelect.addEventListener("change", () => {
+ const groups = parsePairCandleGroups(pairCandlesTextarea.value);
+ refreshCandlesSelectors(groups, chartPairSelect, chartTimeframeSelect);
+ renderSelectedCandlesChart(
+ candlesChart,
+ candlesChartMeta,
+ groups,
+ chartPairSelect,
+ chartTimeframeSelect,
+ );
+ });
+
+ chartTimeframeSelect.addEventListener("change", () => {
+ const groups = parsePairCandleGroups(pairCandlesTextarea.value);
+ renderSelectedCandlesChart(
+ candlesChart,
+ candlesChartMeta,
+ groups,
+ chartPairSelect,
+ chartTimeframeSelect,
+ );
+ });
+
clearButton.addEventListener("click", () => {
clearInspection(
backfillTextarea,
@@ -229,9 +656,110 @@ document.addEventListener("DOMContentLoaded", async () => {
tokenMintInput.value = "";
pairIdInput.value = "";
poolAddressInput.value = "";
+ backfillPoolAddressInput.value = "";
+ backfillPoolOnlyLimitInput.value = "50";
+ chartPairSelect.innerHTML = `Aucune `;
+ chartTimeframeSelect.innerHTML = `Aucun `;
+ setEmptyCandlesChart(candlesChart, candlesChartMeta, "Aucune candle disponible.");
appendLogLine(logTextarea, "[ui] inspection state cleared");
});
+ backfillPoolButton.addEventListener("click", async () => {
+ const poolAddress = backfillPoolAddressInput.value.trim();
+ if (poolAddress === "") {
+ appendLogLine(logTextarea, "[ui] backfill pool address is required");
+ return;
+ }
+
+ const poolSignatureLimit = readPositiveIntegerInput(
+ backfillPoolOnlyLimitInput,
+ logTextarea,
+ "poolSignatureLimit",
+ );
+ if (poolSignatureLimit === undefined) {
+ return;
+ }
+
+ const httpRoleText = backfillHttpRoleInput.value.trim();
+ const httpRole = httpRoleText === "" ? null : httpRoleText;
+
+ appendLogLine(
+ logTextarea,
+ `[ui] launching pool backfill for '${poolAddress}' with role '${httpRole ?? "history_backfill"}' (pool=${poolSignatureLimit})`,
+ );
+
+ const request: KbDemoPipelineBackfillPoolRequest = {
+ poolAddress,
+ httpRole,
+ poolSignatureLimit,
+ };
+
+ try {
+ const payload = await invoke(
+ "demo_pipeline_backfill_pool_address",
+ { request },
+ );
+
+ backfillTextarea.value = payload.backfillJson;
+ appendLogLine(
+ logTextarea,
+ `[ui] pool backfill completed for '${payload.poolAddress}' with role '${payload.httpRole}'`,
+ );
+
+ if (!payload.poolPersistedAfterBackfill) {
+ appendLogLine(
+ logTextarea,
+ `[ui] backfill completed but pool '${payload.poolAddress}' is still absent from persisted pool objects; automatic pool inspection skipped`,
+ );
+ return;
+ }
+
+ const inspectRequest: KbDemoPipelineInspectPoolRequest = {
+ poolAddress: payload.poolAddress,
+ customTimeframeSeconds: null,
+ };
+
+ try {
+ const inspectPayload = await invoke(
+ "demo_pipeline_inspect_pool_address",
+ { request: inspectRequest },
+ );
+
+ applyInspectionPayload(
+ inspectPayload,
+ summaryTextarea,
+ transactionTextarea,
+ decodedEventsTextarea,
+ poolsTextarea,
+ pairsTextarea,
+ launchAttributionsTextarea,
+ poolOriginsTextarea,
+ walletsTextarea,
+ tradeEventsTextarea,
+ pairMetricsTextarea,
+ pairCandlesTextarea,
+ pairAnalyticSignalsTextarea,
+ candlesChart,
+ candlesChartMeta,
+ chartPairSelect,
+ chartTimeframeSelect,
+ );
+
+ appendLogLine(
+ logTextarea,
+ `[ui] pool inspection refreshed after backfill for '${payload.poolAddress}'`,
+ );
+ } catch (error) {
+ appendLogLine(
+ logTextarea,
+ `[ui] backfill completed but automatic pool inspection failed for '${payload.poolAddress}': ${String(error)}`,
+ );
+ }
+ } catch (error) {
+ appendLogLine(logTextarea, `[ui] pool backfill error: ${String(error)}`);
+ }
+ });
+
clearLogButton.addEventListener("click", () => {
logTextarea.value = "";
});
@@ -267,18 +795,25 @@ document.addEventListener("DOMContentLoaded", async () => {
try {
const payload = await invoke("demo_pipeline_inspect_signature", { request });
- summaryTextarea.value = payload.summaryJson;
- transactionTextarea.value = payload.transactionJson;
- decodedEventsTextarea.value = payload.decodedEventsJson;
- poolsTextarea.value = payload.poolsJson;
- pairsTextarea.value = payload.pairsJson;
- launchAttributionsTextarea.value = payload.launchAttributionsJson;
- poolOriginsTextarea.value = payload.poolOriginsJson;
- walletsTextarea.value = payload.walletsJson;
- tradeEventsTextarea.value = payload.tradeEventsJson;
- pairMetricsTextarea.value = payload.pairMetricsJson;
- pairCandlesTextarea.value = payload.pairCandlesJson;
- pairAnalyticSignalsTextarea.value = payload.pairAnalyticSignalsJson;
+ applyInspectionPayload(
+ payload,
+ summaryTextarea,
+ transactionTextarea,
+ decodedEventsTextarea,
+ poolsTextarea,
+ pairsTextarea,
+ launchAttributionsTextarea,
+ poolOriginsTextarea,
+ walletsTextarea,
+ tradeEventsTextarea,
+ pairMetricsTextarea,
+ pairCandlesTextarea,
+ pairAnalyticSignalsTextarea,
+ candlesChart,
+ candlesChartMeta,
+ chartPairSelect,
+ chartTimeframeSelect,
+ );
appendLogLine(logTextarea, `[ui] inspection completed for '${payload.signature}'`);
} catch (error) {
@@ -317,18 +852,25 @@ document.addEventListener("DOMContentLoaded", async () => {
try {
const payload = await invoke("demo_pipeline_inspect_token_mint", { request });
- summaryTextarea.value = payload.summaryJson;
- transactionTextarea.value = payload.transactionJson;
- decodedEventsTextarea.value = payload.decodedEventsJson;
- poolsTextarea.value = payload.poolsJson;
- pairsTextarea.value = payload.pairsJson;
- launchAttributionsTextarea.value = payload.launchAttributionsJson;
- poolOriginsTextarea.value = payload.poolOriginsJson;
- walletsTextarea.value = payload.walletsJson;
- tradeEventsTextarea.value = payload.tradeEventsJson;
- pairMetricsTextarea.value = payload.pairMetricsJson;
- pairCandlesTextarea.value = payload.pairCandlesJson;
- pairAnalyticSignalsTextarea.value = payload.pairAnalyticSignalsJson;
+ applyInspectionPayload(
+ payload,
+ summaryTextarea,
+ transactionTextarea,
+ decodedEventsTextarea,
+ poolsTextarea,
+ pairsTextarea,
+ launchAttributionsTextarea,
+ poolOriginsTextarea,
+ walletsTextarea,
+ tradeEventsTextarea,
+ pairMetricsTextarea,
+ pairCandlesTextarea,
+ pairAnalyticSignalsTextarea,
+ candlesChart,
+ candlesChartMeta,
+ chartPairSelect,
+ chartTimeframeSelect,
+ );
appendLogLine(logTextarea, `[ui] token inspection completed for '${payload.signature}'`);
} catch (error) {
@@ -367,18 +909,25 @@ document.addEventListener("DOMContentLoaded", async () => {
try {
const payload = await invoke("demo_pipeline_inspect_pair_id", { request });
- summaryTextarea.value = payload.summaryJson;
- transactionTextarea.value = payload.transactionJson;
- decodedEventsTextarea.value = payload.decodedEventsJson;
- poolsTextarea.value = payload.poolsJson;
- pairsTextarea.value = payload.pairsJson;
- launchAttributionsTextarea.value = payload.launchAttributionsJson;
- poolOriginsTextarea.value = payload.poolOriginsJson;
- walletsTextarea.value = payload.walletsJson;
- tradeEventsTextarea.value = payload.tradeEventsJson;
- pairMetricsTextarea.value = payload.pairMetricsJson;
- pairCandlesTextarea.value = payload.pairCandlesJson;
- pairAnalyticSignalsTextarea.value = payload.pairAnalyticSignalsJson;
+ applyInspectionPayload(
+ payload,
+ summaryTextarea,
+ transactionTextarea,
+ decodedEventsTextarea,
+ poolsTextarea,
+ pairsTextarea,
+ launchAttributionsTextarea,
+ poolOriginsTextarea,
+ walletsTextarea,
+ tradeEventsTextarea,
+ pairMetricsTextarea,
+ pairCandlesTextarea,
+ pairAnalyticSignalsTextarea,
+ candlesChart,
+ candlesChartMeta,
+ chartPairSelect,
+ chartTimeframeSelect,
+ );
appendLogLine(logTextarea, `[ui] pair inspection completed for '${payload.signature}'`);
} catch (error) {
@@ -411,18 +960,25 @@ document.addEventListener("DOMContentLoaded", async () => {
try {
const payload = await invoke("demo_pipeline_inspect_pool_address", { request });
- summaryTextarea.value = payload.summaryJson;
- transactionTextarea.value = payload.transactionJson;
- decodedEventsTextarea.value = payload.decodedEventsJson;
- poolsTextarea.value = payload.poolsJson;
- pairsTextarea.value = payload.pairsJson;
- launchAttributionsTextarea.value = payload.launchAttributionsJson;
- poolOriginsTextarea.value = payload.poolOriginsJson;
- walletsTextarea.value = payload.walletsJson;
- tradeEventsTextarea.value = payload.tradeEventsJson;
- pairMetricsTextarea.value = payload.pairMetricsJson;
- pairCandlesTextarea.value = payload.pairCandlesJson;
- pairAnalyticSignalsTextarea.value = payload.pairAnalyticSignalsJson;
+ applyInspectionPayload(
+ payload,
+ summaryTextarea,
+ transactionTextarea,
+ decodedEventsTextarea,
+ poolsTextarea,
+ pairsTextarea,
+ launchAttributionsTextarea,
+ poolOriginsTextarea,
+ walletsTextarea,
+ tradeEventsTextarea,
+ pairMetricsTextarea,
+ pairCandlesTextarea,
+ pairAnalyticSignalsTextarea,
+ candlesChart,
+ candlesChartMeta,
+ chartPairSelect,
+ chartTimeframeSelect,
+ );
appendLogLine(logTextarea, `[ui] pool inspection completed for '${payload.signature}'`);
} catch (error) {
diff --git a/kb_app/frontend/ts/demo_pipeline2.ts b/kb_app/frontend/ts/demo_pipeline2.ts
new file mode 100644
index 0000000..e79763a
--- /dev/null
+++ b/kb_app/frontend/ts/demo_pipeline2.ts
@@ -0,0 +1,559 @@
+// file: kb_app/frontend/ts/demo_pipeline2.ts
+
+import * as bootstrap from "bootstrap";
+import "simplebar";
+import ResizeObserver from "resize-observer-polyfill";
+import * as echarts from "echarts";
+import { invoke } from "@tauri-apps/api/core";
+import { debug, takeoverConsole } from "@fltsci/tauri-plugin-tracing";
+
+import type { KbDemoPipeline2CatalogPayload } from "./bindings/KbDemoPipeline2CatalogPayload.ts";
+import type { KbDemoPipeline2BackfillTokenRequest } from "./bindings/KbDemoPipeline2BackfillTokenRequest.ts";
+import type { KbDemoPipeline2BackfillPoolRequest } from "./bindings/KbDemoPipeline2BackfillPoolRequest.ts";
+import type { KbDemoPipeline2BackfillPayload } from "./bindings/KbDemoPipeline2BackfillPayload.ts";
+import type { KbDemoPipeline2PairCandlesRequest } from "./bindings/KbDemoPipeline2PairCandlesRequest.ts";
+import type { KbDemoPipeline2PairCandlesPayload } from "./bindings/KbDemoPipeline2PairCandlesPayload.ts";
+
+(window as Window & typeof globalThis & { bootstrap?: typeof bootstrap }).bootstrap = bootstrap;
+(window as Window & typeof globalThis & { ResizeObserver?: typeof ResizeObserver }).ResizeObserver = ResizeObserver;
+
+interface PairCandle {
+ id: number | null;
+ pair_id: number;
+ timeframe_seconds: number;
+ bucket_start_unix: number;
+ bucket_end_unix: number;
+ open_price_quote_per_base: number;
+ high_price_quote_per_base: number;
+ low_price_quote_per_base: number;
+ close_price_quote_per_base: number;
+ trade_count: number;
+ buy_count: number;
+ sell_count: number;
+ base_volume_raw: string | null;
+ quote_volume_raw: string | null;
+ first_trade_signature: string | null;
+ last_trade_signature: string | null;
+ created_at: string;
+ updated_at: string;
+}
+
+function appendLogLine(textarea: HTMLTextAreaElement, line: string): void {
+ const now = new Date();
+ const timestamp = now.toLocaleTimeString("fr-CH", { hour12: false });
+
+ const lines = textarea.value === "" ? [] : textarea.value.split("\n");
+ lines.push(`[${timestamp}] ${line}`);
+
+ textarea.value = lines.slice(-300).join("\n");
+ textarea.scrollTop = textarea.scrollHeight;
+}
+
+function setEmptyChart(
+ chart: echarts.ECharts,
+ chartMeta: HTMLElement,
+ message: string,
+): void {
+ chartMeta.textContent = message;
+
+ chart.setOption({
+ animation: false,
+ title: {
+ text: message,
+ left: "center",
+ top: "middle",
+ textStyle: {
+ fontSize: 14,
+ fontWeight: "normal",
+ },
+ },
+ tooltip: {},
+ xAxis: { show: false, type: "category", data: [] },
+ yAxis: { show: false, type: "value" },
+ series: [],
+ }, true);
+}
+
+function readPositiveIntegerInput(
+ input: HTMLInputElement,
+ logTextarea: HTMLTextAreaElement,
+ label: string,
+): number | undefined {
+ const text = input.value.trim();
+ if (text === "") {
+ appendLogLine(logTextarea, `[ui] ${label} is required`);
+ return undefined;
+ }
+
+ const parsed = Number.parseInt(text, 10);
+ if (Number.isNaN(parsed) || parsed <= 0) {
+ appendLogLine(logTextarea, `[ui] invalid ${label} '${text}'`);
+ return undefined;
+ }
+
+ return parsed;
+}
+
+function refreshPairSelect(
+ catalog: KbDemoPipeline2CatalogPayload,
+ select: HTMLSelectElement,
+): void {
+ const previousValue = select.value;
+
+ select.innerHTML = "";
+ const emptyOption = document.createElement("option");
+ emptyOption.value = "";
+ emptyOption.textContent = "Aucune";
+ select.appendChild(emptyOption);
+
+ for (const pair of catalog.pairs) {
+ const option = document.createElement("option");
+ option.value = pair.pairId.toString();
+ option.textContent = `#${pair.pairId.toString()} ${pair.symbol ?? ""} ${pair.poolAddress}`.trim();
+ if (option.value === previousValue) {
+ option.selected = true;
+ }
+ select.appendChild(option);
+ }
+}
+
+function renderCatalogTextareas(
+ catalog: KbDemoPipeline2CatalogPayload,
+ tokensTextarea: HTMLTextAreaElement,
+ poolsTextarea: HTMLTextAreaElement,
+ pairsTextarea: HTMLTextAreaElement,
+): void {
+ tokensTextarea.value = JSON.stringify(catalog.tokens, null, 2);
+ poolsTextarea.value = JSON.stringify(catalog.pools, null, 2);
+ pairsTextarea.value = JSON.stringify(catalog.pairs, null, 2);
+}
+
+function parseCandlesJson(raw: string): PairCandle[] {
+ if (raw.trim() === "") {
+ return [];
+ }
+
+ try {
+ return JSON.parse(raw) as PairCandle[];
+ } catch {
+ return [];
+ }
+}
+
+function parseVolume(text: string | null, fallback: number): number {
+ if (text === null || text.trim() === "") {
+ return Number(fallback);
+ }
+
+ const parsed = Number.parseFloat(text);
+ if (Number.isNaN(parsed)) {
+ return Number(fallback);
+ }
+
+ return parsed;
+}
+
+function renderCandlesChart(
+ chart: echarts.ECharts,
+ chartMeta: HTMLElement,
+ pairId: number,
+ timeframeSeconds: number,
+ candles: PairCandle[],
+): void {
+ if (candles.length === 0) {
+ setEmptyChart(chart, chartMeta, "Aucune candle disponible.");
+ return;
+ }
+
+ const sorted = [...candles].sort(
+ (left, right) => left.bucket_start_unix - right.bucket_start_unix,
+ );
+
+ const categoryData = sorted.map((candle) =>
+ new Date(Number(candle.bucket_start_unix) * 1000).toLocaleString("fr-CH", {
+ hour12: false,
+ year: "2-digit",
+ month: "2-digit",
+ day: "2-digit",
+ hour: "2-digit",
+ minute: "2-digit",
+ }),
+ );
+
+ const ohlcData = sorted.map((candle) => [
+ candle.open_price_quote_per_base,
+ candle.close_price_quote_per_base,
+ candle.low_price_quote_per_base,
+ candle.high_price_quote_per_base,
+ ]);
+
+ const volumeData = sorted.map((candle) =>
+ parseVolume(candle.quote_volume_raw, candle.trade_count),
+ );
+
+ chartMeta.textContent =
+ `Pair #${pairId.toString()} • timeframe ${timeframeSeconds.toString()}s • ${sorted.length} candles`;
+
+ chart.setOption({
+ animation: false,
+ legend: {
+ data: ["OHLC", "Volume"],
+ top: 0,
+ },
+ tooltip: {
+ trigger: "axis",
+ axisPointer: {
+ type: "cross",
+ },
+ },
+ axisPointer: {
+ link: [{ xAxisIndex: "all" }],
+ },
+ grid: [
+ { left: 60, right: 24, top: 40, height: "58%" },
+ { left: 60, right: 24, top: "74%", height: "16%" },
+ ],
+ xAxis: [
+ {
+ type: "category",
+ data: categoryData,
+ boundaryGap: true,
+ axisLine: { onZero: false },
+ splitLine: { show: false },
+ min: "dataMin",
+ max: "dataMax",
+ },
+ {
+ type: "category",
+ gridIndex: 1,
+ data: categoryData,
+ boundaryGap: true,
+ axisLine: { onZero: false },
+ axisTick: { show: false },
+ splitLine: { show: false },
+ axisLabel: { show: false },
+ min: "dataMin",
+ max: "dataMax",
+ },
+ ],
+ yAxis: [
+ {
+ scale: true,
+ splitArea: { show: false },
+ },
+ {
+ gridIndex: 1,
+ scale: true,
+ splitNumber: 2,
+ },
+ ],
+ dataZoom: [
+ {
+ type: "inside",
+ xAxisIndex: [0, 1],
+ start: 0,
+ end: 100,
+ },
+ {
+ show: true,
+ type: "slider",
+ xAxisIndex: [0, 1],
+ bottom: 6,
+ start: 0,
+ end: 100,
+ },
+ ],
+ series: [
+ {
+ name: "OHLC",
+ type: "candlestick",
+ data: ohlcData,
+ },
+ {
+ name: "Volume",
+ type: "bar",
+ xAxisIndex: 1,
+ yAxisIndex: 1,
+ data: volumeData,
+ },
+ ],
+ }, true);
+}
+
+document.addEventListener("DOMContentLoaded", async () => {
+ void takeoverConsole();
+ debug("demo_pipeline2 window loaded");
+
+ const tooltipTriggerList = document.querySelectorAll('[data-bs-toggle="tooltip"]');
+ Array.from(tooltipTriggerList).map((tooltipTriggerEl) => new bootstrap.Tooltip(tooltipTriggerEl));
+
+ const refreshCatalogButton = document.querySelector("#demoPipeline2RefreshCatalogButton");
+ const tokensTextarea = document.querySelector("#demoPipeline2TokensTextarea");
+ const poolsTextarea = document.querySelector("#demoPipeline2PoolsTextarea");
+ const pairsTextarea = document.querySelector("#demoPipeline2PairsTextarea");
+
+ const httpRoleInput = document.querySelector("#demoPipeline2HttpRoleInput");
+ const mintInput = document.querySelector("#demoPipeline2MintInput");
+ const mintSignatureLimitInput = document.querySelector("#demoPipeline2MintSignatureLimitInput");
+ const mintPoolLimitInput = document.querySelector("#demoPipeline2MintPoolLimitInput");
+ const backfillMintButton = document.querySelector("#demoPipeline2BackfillMintButton");
+
+ const poolInput = document.querySelector("#demoPipeline2PoolInput");
+ const poolSignatureLimitInput = document.querySelector("#demoPipeline2PoolSignatureLimitInput");
+ const backfillPoolButton = document.querySelector("#demoPipeline2BackfillPoolButton");
+
+ const pairSelect = document.querySelector("#demoPipeline2PairSelect");
+ const timeframeSelect = document.querySelector("#demoPipeline2TimeframeSelect");
+ const customTimeframeInput = document.querySelector("#demoPipeline2CustomTimeframeInput");
+ const preferMaterializedInput = document.querySelector("#demoPipeline2PreferMaterializedInput");
+ const loadCandlesButton = document.querySelector("#demoPipeline2LoadCandlesButton");
+
+ const backfillSummaryTextarea = document.querySelector("#demoPipeline2BackfillSummaryTextarea");
+ const chartElement = document.querySelector("#demoPipeline2Chart");
+ const chartMeta = document.querySelector("#demoPipeline2ChartMeta");
+
+ const clearLogButton = document.querySelector("#demoPipeline2ClearLogButton");
+ const logTextarea = document.querySelector("#demoPipeline2LogTextarea");
+
+ if (
+ !refreshCatalogButton ||
+ !tokensTextarea ||
+ !poolsTextarea ||
+ !pairsTextarea ||
+ !httpRoleInput ||
+ !mintInput ||
+ !mintSignatureLimitInput ||
+ !mintPoolLimitInput ||
+ !backfillMintButton ||
+ !poolInput ||
+ !poolSignatureLimitInput ||
+ !backfillPoolButton ||
+ !pairSelect ||
+ !timeframeSelect ||
+ !customTimeframeInput ||
+ !preferMaterializedInput ||
+ !loadCandlesButton ||
+ !backfillSummaryTextarea ||
+ !chartElement ||
+ !chartMeta ||
+ !clearLogButton ||
+ !logTextarea
+ ) {
+ console.error("demo_pipeline2 DOM is incomplete");
+ return;
+ }
+
+ const safeTokensTextarea = tokensTextarea;
+ const safePoolsTextarea = poolsTextarea;
+ const safePairsTextarea = pairsTextarea;
+
+
+ const safePairSelect = pairSelect;
+ const safeChartElement = chartElement;
+ const safeChartMeta = chartMeta;
+
+ const safeLogTextarea = logTextarea;
+
+ const chart = echarts.init(safeChartElement);
+ setEmptyChart(chart, safeChartMeta, "Aucune candle disponible.");
+ window.addEventListener("resize", () => chart.resize());
+
+ clearLogButton.addEventListener("click", () => {
+ logTextarea.value = "";
+ });
+
+ let currentCatalog: KbDemoPipeline2CatalogPayload | null = null;
+
+ async function refreshCatalog(): Promise {
+ appendLogLine(safeLogTextarea, "[ui] refreshing local catalog");
+
+ try {
+ const catalog = await invoke("demo_pipeline2_get_catalog");
+ currentCatalog = catalog;
+
+ renderCatalogTextareas(catalog, safeTokensTextarea, safePoolsTextarea, safePairsTextarea);
+ refreshPairSelect(catalog, safePairSelect);
+
+ appendLogLine(
+ safeLogTextarea,
+ `[ui] catalog refreshed: ${catalog.tokens.length} tokens, ${catalog.pools.length} pools, ${catalog.pairs.length} pairs`,
+ );
+ } catch (error) {
+ appendLogLine(safeLogTextarea, `[ui] catalog refresh error: ${String(error)}`);
+ }
+ }
+
+ refreshCatalogButton.addEventListener("click", () => {
+ void refreshCatalog();
+ });
+
+ backfillMintButton.addEventListener("click", async () => {
+ const tokenMint = mintInput.value.trim();
+ if (tokenMint === "") {
+ appendLogLine(logTextarea, "[ui] token mint is required");
+ return;
+ }
+
+ const mintSignatureLimit = readPositiveIntegerInput(
+ mintSignatureLimitInput,
+ logTextarea,
+ "mintSignatureLimit",
+ );
+ if (mintSignatureLimit === undefined) {
+ return;
+ }
+
+ const poolSignatureLimit = readPositiveIntegerInput(
+ mintPoolLimitInput,
+ logTextarea,
+ "poolSignatureLimit",
+ );
+ if (poolSignatureLimit === undefined) {
+ return;
+ }
+
+ const httpRoleText = httpRoleInput.value.trim();
+ const httpRole = httpRoleText === "" ? null : httpRoleText;
+
+ appendLogLine(
+ logTextarea,
+ `[ui] launching token backfill for '${tokenMint}' with role '${httpRole ?? "history_backfill"}'`,
+ );
+
+ const request: KbDemoPipeline2BackfillTokenRequest = {
+ tokenMint,
+ httpRole,
+ mintSignatureLimit,
+ poolSignatureLimit,
+ };
+
+ try {
+ const payload = await invoke(
+ "demo_pipeline2_backfill_token_mint",
+ { request },
+ );
+
+ backfillSummaryTextarea.value = payload.summaryJson;
+ currentCatalog = payload.catalog;
+ renderCatalogTextareas(payload.catalog, tokensTextarea, poolsTextarea, pairsTextarea);
+ refreshPairSelect(payload.catalog, pairSelect);
+
+ appendLogLine(logTextarea, `[ui] token backfill completed for '${payload.objectKey}'`);
+ } catch (error) {
+ appendLogLine(logTextarea, `[ui] token backfill error: ${String(error)}`);
+ }
+ });
+
+ backfillPoolButton.addEventListener("click", async () => {
+ const poolAddress = poolInput.value.trim();
+ if (poolAddress === "") {
+ appendLogLine(logTextarea, "[ui] pool address is required");
+ return;
+ }
+
+ const poolSignatureLimit = readPositiveIntegerInput(
+ poolSignatureLimitInput,
+ logTextarea,
+ "poolSignatureLimit",
+ );
+ if (poolSignatureLimit === undefined) {
+ return;
+ }
+
+ const httpRoleText = httpRoleInput.value.trim();
+ const httpRole = httpRoleText === "" ? null : httpRoleText;
+
+ appendLogLine(
+ logTextarea,
+ `[ui] launching pool backfill for '${poolAddress}' with role '${httpRole ?? "history_backfill"}'`,
+ );
+
+ const request: KbDemoPipeline2BackfillPoolRequest = {
+ poolAddress,
+ httpRole,
+ poolSignatureLimit,
+ };
+
+ try {
+ const payload = await invoke(
+ "demo_pipeline2_backfill_pool_address",
+ { request },
+ );
+
+ backfillSummaryTextarea.value = payload.summaryJson;
+ currentCatalog = payload.catalog;
+ renderCatalogTextareas(payload.catalog, tokensTextarea, poolsTextarea, pairsTextarea);
+ refreshPairSelect(payload.catalog, pairSelect);
+
+ appendLogLine(logTextarea, `[ui] pool backfill completed for '${payload.objectKey}'`);
+ } catch (error) {
+ appendLogLine(logTextarea, `[ui] pool backfill error: ${String(error)}`);
+ }
+ });
+
+ loadCandlesButton.addEventListener("click", async () => {
+ const pairIdText = pairSelect.value.trim();
+ if (pairIdText === "") {
+ appendLogLine(logTextarea, "[ui] pair selection is required");
+ return;
+ }
+
+ const parsedPairId = Number.parseInt(pairIdText, 10);
+ if (Number.isNaN(parsedPairId) || parsedPairId <= 0) {
+ appendLogLine(logTextarea, `[ui] invalid pair id '${pairIdText}'`);
+ return;
+ }
+
+ let timeframeSeconds = Number.parseInt(timeframeSelect.value.trim(), 10);
+ const customTimeframeText = customTimeframeInput.value.trim();
+ if (customTimeframeText !== "") {
+ const parsedCustom = Number.parseInt(customTimeframeText, 10);
+ if (Number.isNaN(parsedCustom) || parsedCustom <= 0) {
+ appendLogLine(logTextarea, `[ui] invalid custom timeframe '${customTimeframeText}'`);
+ return;
+ }
+ timeframeSeconds = parsedCustom;
+ }
+
+ appendLogLine(
+ logTextarea,
+ `[ui] loading candles for pair '${parsedPairId}' timeframe '${timeframeSeconds}s'`,
+ );
+
+ const request: KbDemoPipeline2PairCandlesRequest = {
+ pairId: parsedPairId,
+ timeframeSeconds,
+ preferMaterialized: preferMaterializedInput.checked,
+ };
+
+ try {
+ const payload = await invoke(
+ "demo_pipeline2_get_pair_candles",
+ { request },
+ );
+
+ const candles = parseCandlesJson(payload.candlesJson);
+ renderCandlesChart(
+ chart,
+ chartMeta,
+ payload.pairId,
+ payload.timeframeSeconds,
+ candles,
+ );
+
+ appendLogLine(
+ logTextarea,
+ `[ui] loaded ${candles.length} candles for pair '${payload.pairId.toString()}'`,
+ );
+ } catch (error) {
+ appendLogLine(logTextarea, `[ui] load candles error: ${String(error)}`);
+ setEmptyChart(chart, chartMeta, "Erreur lors du chargement des candles.");
+ }
+ });
+
+ await refreshCatalog();
+
+ if (currentCatalog !== null && currentCatalog.pairs.length > 0) {
+ pairSelect.value = currentCatalog.pairs[0].pairId.toString();
+ }
+});
diff --git a/kb_app/frontend/ts/main.ts b/kb_app/frontend/ts/main.ts
index 487feac..5d96fd9 100644
--- a/kb_app/frontend/ts/main.ts
+++ b/kb_app/frontend/ts/main.ts
@@ -39,6 +39,14 @@ async function openDemoPipelineWindow(): Promise {
console.error("open_demo_pipeline_window failed:", error);
}
}
+
+async function openDemoPipeline2Window(): Promise {
+ try {
+ await invoke("open_demo_pipeline2_window");
+ } catch (error) {
+ console.error("open_demo_pipeline2_window2 failed:", error);
+ }
+}
document.addEventListener("DOMContentLoaded", async () => {
void takeoverConsole();
@@ -87,6 +95,8 @@ document.addEventListener("DOMContentLoaded", async () => {
const openDemoWsManagerButtonSecondary = document.querySelector("#openDemoWsManagerButtonSecondary");
const openDemoPipelineButton = document.querySelector("#openDemoPipelineButton");
const openDemoPipelineButtonSecondary = document.querySelector("#openDemoPipelineButtonSecondary");
+ const openDemoPipeline2Button = document.querySelector("#openDemoPipeline2Button");
+ const openDemoPipeline2ButtonSecondary = document.querySelector("#openDemoPipeline2ButtonSecondary");
if (openDemoWsButton) {
openDemoWsButton.addEventListener("click", () => {
@@ -136,4 +146,16 @@ document.addEventListener("DOMContentLoaded", async () => {
});
}
+ if (openDemoPipeline2Button) {
+ openDemoPipeline2Button.addEventListener("click", () => {
+ void openDemoPipeline2Window();
+ });
+ }
+
+ if (openDemoPipeline2ButtonSecondary) {
+ openDemoPipeline2ButtonSecondary.addEventListener("click", () => {
+ void openDemoPipeline2Window();
+ });
+ }
+
});
\ No newline at end of file
diff --git a/kb_app/gen/schemas/capabilities.json b/kb_app/gen/schemas/capabilities.json
index 04512ec..cca42be 100644
--- a/kb_app/gen/schemas/capabilities.json
+++ b/kb_app/gen/schemas/capabilities.json
@@ -1 +1 @@
-{"default":{"identifier":"default","description":"Capability for the main window","local":true,"windows":["main","splash","demo_ws","demo_http","demo_ws_manager","demo_pipeline"],"permissions":["core:default","tracing:default"]}}
\ No newline at end of file
+{"default":{"identifier":"default","description":"Capability for the main window","local":true,"windows":["main","splash","demo_ws","demo_http","demo_ws_manager","demo_pipeline","demo_pipeline2"],"permissions":["core:default","tracing:default"]}}
\ No newline at end of file
diff --git a/kb_app/package.json b/kb_app/package.json
index d2e4f2f..c15d41c 100644
--- a/kb_app/package.json
+++ b/kb_app/package.json
@@ -1,7 +1,7 @@
{
"name": "kb-app",
"private": true,
- "version": "0.7.23",
+ "version": "0.7.24",
"type": "module",
"scripts": {
"dev": "vite",
diff --git a/kb_app/src/demo_pipeline.rs b/kb_app/src/demo_pipeline.rs
index 3a68170..2978688 100644
--- a/kb_app/src/demo_pipeline.rs
+++ b/kb_app/src/demo_pipeline.rs
@@ -6,7 +6,10 @@ use tauri::Manager;
/// Request payload for one pipeline inspection by signature.
#[derive(Clone, Debug, serde::Deserialize, ts_rs::TS)]
-#[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineInspectRequest.ts")]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipelineInspectRequest.ts"
+)]
#[serde(rename_all = "camelCase")]
pub(crate) struct KbDemoPipelineInspectRequest {
/// Transaction signature to inspect.
@@ -17,7 +20,10 @@ pub(crate) struct KbDemoPipelineInspectRequest {
/// Response payload for one pipeline inspection.
#[derive(Clone, Debug, serde::Serialize, ts_rs::TS)]
-#[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineInspectPayload.ts")]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipelineInspectPayload.ts"
+)]
#[serde(rename_all = "camelCase")]
pub(crate) struct KbDemoPipelineInspectPayload {
/// Inspected signature.
@@ -50,7 +56,10 @@ pub(crate) struct KbDemoPipelineInspectPayload {
/// Request payload for one pipeline inspection by token mint.
#[derive(Clone, Debug, serde::Deserialize, ts_rs::TS)]
-#[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineInspectTokenRequest.ts")]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipelineInspectTokenRequest.ts"
+)]
#[serde(rename_all = "camelCase")]
pub(crate) struct KbDemoPipelineInspectTokenRequest {
/// Token mint to inspect.
@@ -61,7 +70,10 @@ pub(crate) struct KbDemoPipelineInspectTokenRequest {
/// Request payload for one pipeline inspection by pair id.
#[derive(Clone, Debug, serde::Deserialize, ts_rs::TS)]
-#[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineInspectPairRequest.ts")]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipelineInspectPairRequest.ts"
+)]
#[serde(rename_all = "camelCase")]
pub(crate) struct KbDemoPipelineInspectPairRequest {
/// Pair id to inspect.
@@ -72,7 +84,10 @@ pub(crate) struct KbDemoPipelineInspectPairRequest {
/// Request payload for one pipeline inspection by pool address.
#[derive(Clone, Debug, serde::Deserialize, ts_rs::TS)]
-#[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineInspectPoolRequest.ts")]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipelineInspectPoolRequest.ts"
+)]
#[serde(rename_all = "camelCase")]
pub(crate) struct KbDemoPipelineInspectPoolRequest {
/// Pool address to inspect.
@@ -83,7 +98,10 @@ pub(crate) struct KbDemoPipelineInspectPoolRequest {
/// Request payload for one token backfill launched from `kb_app`.
#[derive(Clone, Debug, serde::Deserialize, ts_rs::TS)]
-#[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineBackfillTokenRequest.ts")]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipelineBackfillTokenRequest.ts"
+)]
#[serde(rename_all = "camelCase")]
pub(crate) struct KbDemoPipelineBackfillTokenRequest {
/// Token mint to backfill.
@@ -98,7 +116,10 @@ pub(crate) struct KbDemoPipelineBackfillTokenRequest {
/// Response payload for one token backfill launched from `kb_app`.
#[derive(Clone, Debug, serde::Serialize, ts_rs::TS)]
-#[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineBackfillTokenPayload.ts")]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipelineBackfillTokenPayload.ts"
+)]
#[serde(rename_all = "camelCase")]
pub(crate) struct KbDemoPipelineBackfillTokenPayload {
/// Backfilled token mint.
@@ -111,6 +132,109 @@ pub(crate) struct KbDemoPipelineBackfillTokenPayload {
pub token_persisted_after_backfill: bool,
}
+/// Request payload for one pool backfill launched from `kb_app`.
+#[derive(Clone, Debug, serde::Deserialize, ts_rs::TS)]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipelineBackfillPoolRequest.ts"
+)]
+#[serde(rename_all = "camelCase")]
+pub(crate) struct KbDemoPipelineBackfillPoolRequest {
+ /// Pool address to backfill.
+ pub pool_address: std::string::String,
+ /// HTTP role used to select one endpoint in the pool.
+ pub http_role: std::option::Option,
+ /// Maximum number of signatures fetched from the pool address.
+ pub pool_signature_limit: u32,
+}
+
+/// Response payload for one pool backfill launched from `kb_app`.
+#[derive(Clone, Debug, serde::Serialize, ts_rs::TS)]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipelineBackfillPoolPayload.ts"
+)]
+#[serde(rename_all = "camelCase")]
+pub(crate) struct KbDemoPipelineBackfillPoolPayload {
+ /// Backfilled pool address.
+ pub pool_address: std::string::String,
+ /// HTTP role used during backfill.
+ pub http_role: std::string::String,
+ /// Pretty JSON summary returned by `KbTokenBackfillService::backfill_pool_by_address`.
+ pub backfill_json: std::string::String,
+ /// Whether the pool exists in persisted pool objects after backfill.
+ pub pool_persisted_after_backfill: bool,
+}
+
+/// Launches one pool backfill through the persisted `kb_lib` services.
+#[tauri::command]
+pub(crate) async fn demo_pipeline_backfill_pool_address(
+ state: tauri::State<'_, crate::KbAppState>,
+ request: KbDemoPipelineBackfillPoolRequest,
+) -> Result {
+ let pool_address = request.pool_address.trim().to_string();
+ if pool_address.is_empty() {
+ return Err("demo pipeline backfill pool address must not be empty".to_string());
+ }
+ let http_role = match request.http_role.clone() {
+ Some(http_role) => {
+ let trimmed = http_role.trim().to_string();
+ if trimmed.is_empty() {
+ "history_backfill".to_string()
+ } else {
+ trimmed
+ }
+ }
+ None => "history_backfill".to_string(),
+ };
+ if request.pool_signature_limit == 0 {
+ return Err("demo pipeline poolSignatureLimit must be > 0".to_string());
+ }
+ let database = state.database.clone();
+ let http_pool = std::sync::Arc::new(state.http_pool.clone());
+ let service =
+ kb_lib::KbTokenBackfillService::new(http_pool, database.clone(), http_role.clone());
+ let backfill_result = service
+ .backfill_pool_by_address(pool_address.as_str(), request.pool_signature_limit as usize)
+ .await;
+ let backfill = match backfill_result {
+ Ok(backfill) => backfill,
+ Err(error) => {
+ return Err(format!(
+ "cannot backfill pool address '{}' with role '{}': {}",
+ pool_address, http_role, error
+ ));
+ }
+ };
+ let pool_result = kb_lib::get_pool_by_address(database.as_ref(), pool_address.as_str()).await;
+ let pool_option = match pool_result {
+ Ok(pool_option) => pool_option,
+ Err(error) => {
+ return Err(format!(
+ "cannot verify persisted pool '{}' after backfill with role '{}': {}",
+ pool_address, http_role, error
+ ));
+ }
+ };
+ let pool_persisted_after_backfill = pool_option.is_some();
+ let backfill_json_result = serde_json::to_string_pretty(&backfill);
+ let backfill_json = match backfill_json_result {
+ Ok(backfill_json) => backfill_json,
+ Err(error) => {
+ return Err(format!(
+ "cannot serialize pool backfill result for '{}': {}",
+ pool_address, error
+ ));
+ }
+ };
+ Ok(KbDemoPipelineBackfillPoolPayload {
+ pool_address,
+ http_role,
+ backfill_json,
+ pool_persisted_after_backfill,
+ })
+}
+
/// Launches one token backfill through the persisted `kb_lib` services.
#[tauri::command]
pub(crate) async fn demo_pipeline_backfill_token_mint(
diff --git a/kb_app/src/demo_pipeline2.rs b/kb_app/src/demo_pipeline2.rs
new file mode 100644
index 0000000..7550ac5
--- /dev/null
+++ b/kb_app/src/demo_pipeline2.rs
@@ -0,0 +1,524 @@
+// file: kb_app/src/demo_pipeline2.rs
+
+//! Tauri commands for the focused pipeline demo window.
+//!
+//! This demo is intentionally narrower than `Demo Pipeline`:
+//! - read the local catalog of tokens / pools / pairs,
+//! - trigger targeted backfills from the chain,
+//! - load candles for one selected pair and timeframe.
+
+use tauri::Manager;
+use ts_rs::TS;
+
+/// One token item for the local catalog.
+#[derive(Clone, Debug, serde::Serialize, TS)]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipeline2TokenItem.ts"
+)]
+#[serde(rename_all = "camelCase")]
+pub(crate) struct KbDemoPipeline2TokenItem {
+ /// Token mint.
+ pub mint: std::string::String,
+ /// Optional token symbol.
+ pub symbol: std::option::Option,
+ /// Optional token name.
+ pub name: std::option::Option,
+}
+
+/// One pool item for the local catalog.
+#[derive(Clone, Debug, serde::Serialize, TS)]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipeline2PoolItem.ts"
+)]
+#[serde(rename_all = "camelCase")]
+pub(crate) struct KbDemoPipeline2PoolItem {
+ /// Pool address.
+ pub pool_address: std::string::String,
+ /// Optional internal pair id when known.
+ #[ts(type = "number | null")]
+ pub pair_id: std::option::Option,
+ /// Optional DEX code.
+ pub dex_code: std::option::Option,
+}
+
+/// One pair item for the local catalog.
+#[derive(Clone, Debug, serde::Serialize, TS)]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipeline2PairItem.ts"
+)]
+#[serde(rename_all = "camelCase")]
+pub(crate) struct KbDemoPipeline2PairItem {
+ /// Internal pair id.
+ #[ts(type = "number")]
+ pub pair_id: i64,
+ /// Related pool address.
+ pub pool_address: std::string::String,
+ /// Optional pair symbol.
+ pub symbol: std::option::Option,
+ /// Optional DEX code.
+ pub dex_code: std::option::Option,
+ /// Optional local trade count.
+ #[ts(type = "number | null")]
+ pub trade_count: std::option::Option,
+ /// Optional local last price.
+ #[ts(type = "number | null")]
+ pub last_price_quote_per_base: std::option::Option,
+}
+
+/// Full local catalog payload.
+#[derive(Clone, Debug, serde::Serialize, TS)]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipeline2CatalogPayload.ts"
+)]
+#[serde(rename_all = "camelCase")]
+pub(crate) struct KbDemoPipeline2CatalogPayload {
+ /// Open database URL.
+ pub database_url: std::string::String,
+ /// Observed token list.
+ pub tokens: std::vec::Vec,
+ /// Known pool list.
+ pub pools: std::vec::Vec,
+ /// Known pair list.
+ pub pairs: std::vec::Vec,
+}
+
+/// Request payload for token backfill.
+#[derive(Clone, Debug, serde::Deserialize, TS)]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipeline2BackfillTokenRequest.ts"
+)]
+#[serde(rename_all = "camelCase")]
+pub(crate) struct KbDemoPipeline2BackfillTokenRequest {
+ /// Token mint to backfill.
+ pub token_mint: std::string::String,
+ /// Optional HTTP role.
+ pub http_role: std::option::Option,
+ /// Limit for signatures fetched from the mint.
+ pub mint_signature_limit: u32,
+ /// Limit for signatures fetched from each discovered pool.
+ pub pool_signature_limit: u32,
+}
+
+/// Request payload for pool backfill.
+#[derive(Clone, Debug, serde::Deserialize, TS)]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipeline2BackfillPoolRequest.ts"
+)]
+#[serde(rename_all = "camelCase")]
+pub(crate) struct KbDemoPipeline2BackfillPoolRequest {
+ /// Pool address to backfill.
+ pub pool_address: std::string::String,
+ /// Optional HTTP role.
+ pub http_role: std::option::Option,
+ /// Limit for signatures fetched from the pool.
+ pub pool_signature_limit: u32,
+}
+
+/// Shared backfill response payload.
+#[derive(Clone, Debug, serde::Serialize, TS)]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipeline2BackfillPayload.ts"
+)]
+#[serde(rename_all = "camelCase")]
+pub(crate) struct KbDemoPipeline2BackfillPayload {
+ /// Object key used by the backfill.
+ pub object_key: std::string::String,
+ /// Mode: `tokenMint` or `poolAddress`.
+ pub mode: std::string::String,
+ /// HTTP role used.
+ pub http_role: std::string::String,
+ /// Pretty JSON summary.
+ pub summary_json: std::string::String,
+ /// Refreshed local catalog after backfill.
+ pub catalog: KbDemoPipeline2CatalogPayload,
+}
+
+/// Request payload for pair candles.
+#[derive(Clone, Debug, serde::Deserialize, TS)]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipeline2PairCandlesRequest.ts"
+)]
+#[serde(rename_all = "camelCase")]
+pub(crate) struct KbDemoPipeline2PairCandlesRequest {
+ /// Pair id to load.
+ #[ts(type = "number")]
+ pub pair_id: i64,
+ /// Timeframe in seconds.
+ #[ts(type = "number")]
+ pub timeframe_seconds: i64,
+ /// Whether materialized candles should be preferred when available.
+ pub prefer_materialized: bool,
+}
+
+/// Candle payload returned to the UI.
+#[derive(Clone, Debug, serde::Serialize, TS)]
+#[ts(
+ export,
+ export_to = "../frontend/ts/bindings/KbDemoPipeline2PairCandlesPayload.ts"
+)]
+#[serde(rename_all = "camelCase")]
+pub(crate) struct KbDemoPipeline2PairCandlesPayload {
+ /// Pair id.
+ #[ts(type = "number")]
+ pub pair_id: i64,
+ /// Timeframe in seconds.
+ #[ts(type = "number")]
+ pub timeframe_seconds: i64,
+ /// Pretty JSON array of candles.
+ pub candles_json: std::string::String,
+}
+
+/// Opens the `Demo Pipeline 2` window.
+#[tauri::command]
+pub(crate) fn open_demo_pipeline2_window(
+ app_handle: tauri::AppHandle,
+) -> Result<(), std::string::String> {
+ let existing_window_option = app_handle.get_webview_window("demo_pipeline2");
+
+ let demo_window = match existing_window_option {
+ Some(demo_window) => demo_window,
+ None => {
+ let builder = tauri::WebviewWindowBuilder::new(
+ &app_handle,
+ "demo_pipeline2",
+ tauri::WebviewUrl::App("demo_pipeline2.html".into()),
+ )
+ .title("Demo Pipeline 2")
+ .inner_size(1480.0, 920.0)
+ .min_inner_size(1100.0, 720.0)
+ .center()
+ .visible(true)
+ .transparent(false)
+ .decorations(true);
+ let build_result = builder.build();
+ match build_result {
+ Ok(window) => window,
+ Err(error) => {
+ return Err(format!("cannot create demo_pipeline2 window: {error:?}"));
+ }
+ }
+ }
+ };
+ let show_result = demo_window.show();
+ if let Err(error) = show_result {
+ return Err(format!("cannot show demo_pipeline2 window: {error:?}"));
+ }
+ let focus_result = demo_window.set_focus();
+ if let Err(error) = focus_result {
+ return Err(format!("cannot focus demo_pipeline2 window: {error:?}"));
+ }
+ Ok(())
+}
+
+/// Returns the local catalog of observed tokens, pools and pairs.
+#[tauri::command]
+pub(crate) async fn demo_pipeline2_get_catalog(
+ state: tauri::State<'_, crate::KbAppState>,
+) -> Result {
+ kb_demo_pipeline2_build_catalog(state.database.clone()).await
+}
+
+/// Runs a targeted token backfill then returns the refreshed catalog.
+#[tauri::command]
+pub(crate) async fn demo_pipeline2_backfill_token_mint(
+ state: tauri::State<'_, crate::KbAppState>,
+ request: KbDemoPipeline2BackfillTokenRequest,
+) -> Result {
+ let token_mint = request.token_mint.trim().to_string();
+ if token_mint.is_empty() {
+ return Err("token mint must not be empty".to_string());
+ }
+ if request.mint_signature_limit == 0 {
+ return Err("mintSignatureLimit must be > 0".to_string());
+ }
+ if request.pool_signature_limit == 0 {
+ return Err("poolSignatureLimit must be > 0".to_string());
+ }
+ let http_role = kb_demo_pipeline2_normalize_http_role(request.http_role);
+ let database = state.database.clone();
+ let http_pool = std::sync::Arc::new(state.http_pool.clone());
+ let service =
+ kb_lib::KbTokenBackfillService::new(http_pool, database.clone(), http_role.clone());
+ let result = service
+ .backfill_token_by_mint(
+ token_mint.as_str(),
+ request.mint_signature_limit as usize,
+ request.pool_signature_limit as usize,
+ )
+ .await;
+ let backfill = match result {
+ Ok(backfill) => backfill,
+ Err(error) => {
+ return Err(format!(
+ "cannot backfill token mint '{}' with role '{}': {}",
+ token_mint, http_role, error
+ ));
+ }
+ };
+ let summary_json_result = serde_json::to_string_pretty(&backfill);
+ let summary_json = match summary_json_result {
+ Ok(summary_json) => summary_json,
+ Err(error) => {
+ return Err(format!(
+ "cannot serialize token backfill result for '{}': {}",
+ token_mint, error
+ ));
+ }
+ };
+ let catalog = kb_demo_pipeline2_build_catalog(database).await?;
+ Ok(KbDemoPipeline2BackfillPayload {
+ object_key: token_mint,
+ mode: "tokenMint".to_string(),
+ http_role,
+ summary_json,
+ catalog,
+ })
+}
+
+/// Runs a targeted pool backfill then returns the refreshed catalog.
+#[tauri::command]
+pub(crate) async fn demo_pipeline2_backfill_pool_address(
+ state: tauri::State<'_, crate::KbAppState>,
+ request: KbDemoPipeline2BackfillPoolRequest,
+) -> Result {
+ let pool_address = request.pool_address.trim().to_string();
+ if pool_address.is_empty() {
+ return Err("pool address must not be empty".to_string());
+ }
+ if request.pool_signature_limit == 0 {
+ return Err("poolSignatureLimit must be > 0".to_string());
+ }
+ let http_role = kb_demo_pipeline2_normalize_http_role(request.http_role);
+ let database = state.database.clone();
+ let http_pool = std::sync::Arc::new(state.http_pool.clone());
+ let service =
+ kb_lib::KbTokenBackfillService::new(http_pool, database.clone(), http_role.clone());
+ let result = service
+ .backfill_pool_by_address(pool_address.as_str(), request.pool_signature_limit as usize)
+ .await;
+ let backfill = match result {
+ Ok(backfill) => backfill,
+ Err(error) => {
+ return Err(format!(
+ "cannot backfill pool address '{}' with role '{}': {}",
+ pool_address, http_role, error
+ ));
+ }
+ };
+ let summary_json_result = serde_json::to_string_pretty(&backfill);
+ let summary_json = match summary_json_result {
+ Ok(summary_json) => summary_json,
+ Err(error) => {
+ return Err(format!(
+ "cannot serialize pool backfill result for '{}': {}",
+ pool_address, error
+ ));
+ }
+ };
+ let catalog = kb_demo_pipeline2_build_catalog(database).await?;
+ Ok(KbDemoPipeline2BackfillPayload {
+ object_key: pool_address,
+ mode: "poolAddress".to_string(),
+ http_role,
+ summary_json,
+ catalog,
+ })
+}
+
+/// Loads candles for one pair and one timeframe.
+#[tauri::command]
+pub(crate) async fn demo_pipeline2_get_pair_candles(
+ state: tauri::State<'_, crate::KbAppState>,
+ request: KbDemoPipeline2PairCandlesRequest,
+) -> Result {
+ if request.pair_id <= 0 {
+ return Err("pairId must be > 0".to_string());
+ }
+ if request.timeframe_seconds <= 0 {
+ return Err("timeframeSeconds must be > 0".to_string());
+ }
+ let query_service = kb_lib::KbPairCandleQueryService::new(state.database.clone());
+ let candles_result = query_service
+ .list_pair_candles(
+ request.pair_id,
+ request.timeframe_seconds,
+ None,
+ None,
+ request.prefer_materialized,
+ )
+ .await;
+ let candles = match candles_result {
+ Ok(candles) => candles,
+ Err(error) => {
+ return Err(format!(
+ "cannot load candles for pair '{}' timeframe '{}': {}",
+ request.pair_id, request.timeframe_seconds, error
+ ));
+ }
+ };
+ let candles_json_result = serde_json::to_string_pretty(&candles);
+ let candles_json = match candles_json_result {
+ Ok(candles_json) => candles_json,
+ Err(error) => {
+ return Err(format!(
+ "cannot serialize candles for pair '{}' timeframe '{}': {}",
+ request.pair_id, request.timeframe_seconds, error
+ ));
+ }
+ };
+ Ok(KbDemoPipeline2PairCandlesPayload {
+ pair_id: request.pair_id,
+ timeframe_seconds: request.timeframe_seconds,
+ candles_json,
+ })
+}
+
+async fn kb_demo_pipeline2_build_catalog(
+ database: std::sync::Arc,
+) -> Result {
+ let dexes_result = kb_lib::list_dexes(database.as_ref()).await;
+ let dexes = match dexes_result {
+ Ok(dexes) => dexes,
+ Err(error) => {
+ return Err(format!("cannot list DEXes: {}", error));
+ }
+ };
+ let mut dex_code_by_id = std::collections::BTreeMap::::new();
+ for dex in dexes {
+ if let Some(dex_id) = dex.id {
+ dex_code_by_id.insert(dex_id, dex.code);
+ }
+ }
+ let tokens_result = kb_lib::list_tokens(database.as_ref()).await;
+ let db_tokens = match tokens_result {
+ Ok(db_tokens) => db_tokens,
+ Err(error) => {
+ return Err(format!("cannot list tokens: {}", error));
+ }
+ };
+
+ let mut tokens = std::vec::Vec::::new();
+ for token in db_tokens {
+ tokens.push(KbDemoPipeline2TokenItem {
+ mint: token.mint,
+ symbol: token.symbol,
+ name: token.name,
+ });
+ }
+ let pools_result = kb_lib::list_pools(database.as_ref()).await;
+ let pools = match pools_result {
+ Ok(pools) => pools,
+ Err(error) => {
+ return Err(format!("cannot list pools: {}", error));
+ }
+ };
+ let pairs_result = kb_lib::list_pairs(database.as_ref()).await;
+ let pairs = match pairs_result {
+ Ok(pairs) => pairs,
+ Err(error) => {
+ return Err(format!("cannot list pairs: {}", error));
+ }
+ };
+ let mut pair_by_pool_id = std::collections::BTreeMap::::new();
+ for pair in &pairs {
+ pair_by_pool_id.insert(pair.pool_id, pair.clone());
+ }
+ let mut pair_items = std::vec::Vec::::new();
+ for pair in pairs {
+ let pair_id = match pair.id {
+ Some(pair_id) => pair_id,
+ None => continue,
+ };
+ let pool_result = kb_lib::get_pool_by_address(database.as_ref(), "").await;
+ let _ = pool_result;
+ let pool_address = {
+ let all_pools_result = kb_lib::list_pools(database.as_ref()).await;
+ let all_pools = match all_pools_result {
+ Ok(all_pools) => all_pools,
+ Err(error) => {
+ return Err(format!("cannot reload pools for pair catalog: {}", error));
+ }
+ };
+ let mut found_address = std::string::String::new();
+ for pool in all_pools {
+ let pool_id = match pool.id {
+ Some(pool_id) => pool_id,
+ None => continue,
+ };
+ if pool_id == pair.pool_id {
+ found_address = pool.address;
+ break;
+ }
+ }
+ found_address
+ };
+ let pair_metric_result =
+ kb_lib::get_pair_metric_by_pair_id(database.as_ref(), pair_id).await;
+ let pair_metric_option = match pair_metric_result {
+ Ok(pair_metric_option) => pair_metric_option,
+ Err(error) => {
+ return Err(format!(
+ "cannot fetch pair metric for pair '{}': {}",
+ pair_id, error
+ ));
+ }
+ };
+ let trade_count = pair_metric_option.as_ref().map(|metric| metric.trade_count);
+ let last_price_quote_per_base =
+ pair_metric_option.and_then(|metric| metric.last_price_quote_per_base);
+ pair_items.push(KbDemoPipeline2PairItem {
+ pair_id,
+ pool_address,
+ symbol: pair.symbol,
+ dex_code: dex_code_by_id.get(&pair.dex_id).cloned(),
+ trade_count,
+ last_price_quote_per_base,
+ });
+ }
+ let mut pool_items = std::vec::Vec::::new();
+ for pool in pools {
+ let pool_id = match pool.id {
+ Some(pool_id) => pool_id,
+ None => continue,
+ };
+ let pair_id = pair_by_pool_id.get(&pool_id).and_then(|pair| pair.id);
+ pool_items.push(KbDemoPipeline2PoolItem {
+ pool_address: pool.address,
+ pair_id,
+ dex_code: dex_code_by_id.get(&pool.dex_id).cloned(),
+ });
+ }
+ tokens.sort_by(|left, right| left.mint.cmp(&right.mint));
+ pool_items.sort_by(|left, right| left.pool_address.cmp(&right.pool_address));
+ pair_items.sort_by(|left, right| left.pair_id.cmp(&right.pair_id));
+ Ok(KbDemoPipeline2CatalogPayload {
+ database_url: database.database_url().to_string(),
+ tokens,
+ pools: pool_items,
+ pairs: pair_items,
+ })
+}
+
+fn kb_demo_pipeline2_normalize_http_role(
+ role: std::option::Option,
+) -> std::string::String {
+ match role {
+ Some(role) => {
+ let trimmed = role.trim().to_string();
+ if trimmed.is_empty() {
+ "history_backfill".to_string()
+ } else {
+ trimmed
+ }
+ }
+ None => "history_backfill".to_string(),
+ }
+}
diff --git a/kb_app/src/lib.rs b/kb_app/src/lib.rs
index 23b8ad3..76e9724 100644
--- a/kb_app/src/lib.rs
+++ b/kb_app/src/lib.rs
@@ -11,6 +11,7 @@
mod demo_http;
mod demo_pipeline;
+mod demo_pipeline2;
mod demo_ws;
mod demo_ws_manager;
mod splash;
@@ -143,6 +144,12 @@ pub async fn run() -> Result<(), kb_lib::KbError> {
crate::demo_pipeline::demo_pipeline_inspect_pair_id,
crate::demo_pipeline::demo_pipeline_inspect_pool_address,
crate::demo_pipeline::demo_pipeline_backfill_token_mint,
+ crate::demo_pipeline::demo_pipeline_backfill_pool_address,
+ crate::demo_pipeline2::open_demo_pipeline2_window,
+ crate::demo_pipeline2::demo_pipeline2_get_catalog,
+ crate::demo_pipeline2::demo_pipeline2_backfill_token_mint,
+ crate::demo_pipeline2::demo_pipeline2_backfill_pool_address,
+ crate::demo_pipeline2::demo_pipeline2_get_pair_candles,
]);
tauri_builder = tauri_builder.plugin(tracing_builder.build::());
tauri_builder = tauri_builder.setup(|app| {
diff --git a/kb_app/tauri.conf.json b/kb_app/tauri.conf.json
index 17e3858..bb8b692 100644
--- a/kb_app/tauri.conf.json
+++ b/kb_app/tauri.conf.json
@@ -1,7 +1,7 @@
{
"$schema": "https://schema.tauri.app/config/2",
"productName": "kb-bapp",
- "version": "0.7.23",
+ "version": "0.7.24",
"identifier": "com.sasedev.kb-app",
"build": {
"beforeDevCommand": "npm run dev",
@@ -92,6 +92,20 @@
"create": false,
"transparent": false,
"decorations": true
+ },
+ {
+ "label": "demo_pipeline2",
+ "url": "demo_pipeline2.html",
+ "title": "Demo Pipeline2",
+ "width": 1480,
+ "height": 920,
+ "minWidth": 1000,
+ "minHeight": 700,
+ "center": true,
+ "visible": false,
+ "create": false,
+ "transparent": false,
+ "decorations": true
}
],
"security": {
diff --git a/kb_app/tsconfig.json b/kb_app/tsconfig.json
index f1e1928..9ebfbed 100644
--- a/kb_app/tsconfig.json
+++ b/kb_app/tsconfig.json
@@ -17,8 +17,7 @@
"strict": true,
"noUnusedLocals": true,
"noUnusedParameters": true,
- "noFallthroughCasesInSwitch": true,
- "baseUrl": "./frontend"
+ "noFallthroughCasesInSwitch": true
},
"include": [
"frontend"
diff --git a/kb_app/vite.config.ts b/kb_app/vite.config.ts
index ab80038..5be3776 100644
--- a/kb_app/vite.config.ts
+++ b/kb_app/vite.config.ts
@@ -23,7 +23,11 @@ export default defineConfig(() => ({
input: {
"main": normalizePath(resolve(__dirname, 'frontend/main.html')),
"splash": normalizePath(resolve(__dirname, 'frontend/splash.html')),
- "demo_ws": normalizePath(resolve(__dirname, 'frontend/demo_ws.html'))
+ "demo_ws": normalizePath(resolve(__dirname, 'frontend/demo_ws.html')),
+ "demo_http": normalizePath(resolve(__dirname, 'frontend/demo_http.html')),
+ "demo_ws_manager": normalizePath(resolve(__dirname, 'frontend/demo_ws_manager.html')),
+ "demo_pipeline": normalizePath(resolve(__dirname, 'frontend/demo_pipeline.html')),
+ "demo_pipeline2": normalizePath(resolve(__dirname, 'frontend/demo_pipeline2.html'))
},
output: {
entryFileNames: 'js/[name]-[hash].js',
diff --git a/kb_lib/src/db.rs b/kb_lib/src/db.rs
index 3120e83..a18a418 100644
--- a/kb_lib/src/db.rs
+++ b/kb_lib/src/db.rs
@@ -134,6 +134,7 @@ pub use queries::list_recent_onchain_observations;
pub use queries::list_recent_swaps;
pub use queries::list_recent_token_burn_events;
pub use queries::list_recent_token_mint_events;
+pub use queries::list_tokens;
pub use queries::list_trade_events_by_pair_id;
pub use queries::list_trade_events_by_transaction_id;
pub use queries::list_wallet_holdings_by_wallet_id;
diff --git a/kb_lib/src/db/queries.rs b/kb_lib/src/db/queries.rs
index fcebaa0..0c4651c 100644
--- a/kb_lib/src/db/queries.rs
+++ b/kb_lib/src/db/queries.rs
@@ -109,6 +109,7 @@ pub use pool_token::upsert_pool_token;
pub use swap::list_recent_swaps;
pub use swap::upsert_swap;
pub use token::get_token_by_mint;
+pub use token::list_tokens;
pub use token::upsert_token;
pub use token_burn_event::list_recent_token_burn_events;
pub use token_burn_event::upsert_token_burn_event;
diff --git a/kb_lib/src/db/queries/token.rs b/kb_lib/src/db/queries/token.rs
index 9beab15..6414e8f 100644
--- a/kb_lib/src/db/queries/token.rs
+++ b/kb_lib/src/db/queries/token.rs
@@ -119,3 +119,51 @@ LIMIT 1
}
}
}
+
+
+/// Lists all normalized token rows ordered by mint.
+pub async fn list_tokens(
+ database: &crate::KbDatabase,
+) -> Result, crate::KbError> {
+ match database.connection() {
+ crate::KbDatabaseConnection::Sqlite(pool) => {
+ let query_result = sqlx::query_as::(
+ r#"
+SELECT
+ id,
+ mint,
+ symbol,
+ name,
+ decimals,
+ token_program,
+ is_quote_token,
+ first_seen_at,
+ updated_at
+FROM kb_tokens
+ORDER BY mint ASC, id ASC
+ "#,
+ )
+ .fetch_all(pool)
+ .await;
+ let entities = match query_result {
+ Ok(entities) => entities,
+ Err(error) => {
+ return Err(crate::KbError::Db(format!(
+ "cannot list kb_tokens on sqlite: {}",
+ error
+ )));
+ }
+ };
+ let mut dtos = std::vec::Vec::new();
+ for entity in entities {
+ let dto_result = crate::KbTokenDto::try_from(entity);
+ let dto = match dto_result {
+ Ok(dto) => dto,
+ Err(error) => return Err(error),
+ };
+ dtos.push(dto);
+ }
+ Ok(dtos)
+ }
+ }
+}
\ No newline at end of file
diff --git a/kb_lib/src/db/queries/trade_event.rs b/kb_lib/src/db/queries/trade_event.rs
index 08f533b..b7a7db7 100644
--- a/kb_lib/src/db/queries/trade_event.rs
+++ b/kb_lib/src/db/queries/trade_event.rs
@@ -33,6 +33,10 @@ INSERT INTO kb_trade_events (
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(decoded_event_id) DO UPDATE SET
+ base_amount_raw = COALESCE(excluded.base_amount_raw, kb_trade_events.base_amount_raw),
+ quote_amount_raw = COALESCE(excluded.quote_amount_raw, kb_trade_events.quote_amount_raw),
+ price_quote_per_base = COALESCE(excluded.price_quote_per_base, kb_trade_events.price_quote_per_base),
+ payload_json = excluded.payload_json,
updated_at = excluded.updated_at
"#,
)
diff --git a/kb_lib/src/dex_detect.rs b/kb_lib/src/dex_detect.rs
index 198e95c..dbb5436 100644
--- a/kb_lib/src/dex_detect.rs
+++ b/kb_lib/src/dex_detect.rs
@@ -839,6 +839,25 @@ impl KbDexDetectService {
Ok(base_token_id) => base_token_id,
Err(error) => return Err(error),
};
+ let payload_value_result = kb_parse_payload_json(decoded_event.payload_json.as_str());
+ let payload_value = match payload_value_result {
+ Ok(payload_value) => payload_value,
+ Err(error) => return Err(error),
+ };
+ let vault_addresses = kb_extract_pump_swap_vault_addresses(&payload_value);
+ let token_a_vault_address = vault_addresses.0;
+ let token_b_vault_address = vault_addresses.1;
+
+ let base_vault_address = if base_is_token_a {
+ token_a_vault_address.clone()
+ } else {
+ token_b_vault_address.clone()
+ };
+ let quote_vault_address = if base_is_token_a {
+ token_b_vault_address.clone()
+ } else {
+ token_a_vault_address.clone()
+ };
let quote_token_id_result = self.ensure_token(quote_mint.as_str()).await;
let quote_token_id = match quote_token_id_result {
Ok(quote_token_id) => quote_token_id,
@@ -920,7 +939,7 @@ impl KbDexDetectService {
pool_id,
base_token_id,
crate::KbPoolTokenRole::Base,
- None,
+ base_vault_address,
Some(0),
),
)
@@ -934,7 +953,7 @@ impl KbDexDetectService {
pool_id,
quote_token_id,
crate::KbPoolTokenRole::Quote,
- None,
+ quote_vault_address,
Some(1),
),
)
@@ -961,11 +980,6 @@ impl KbDexDetectService {
}
}
};
- let payload_value_result = kb_parse_payload_json(decoded_event.payload_json.as_str());
- let payload_value = match payload_value_result {
- Ok(payload_value) => payload_value,
- Err(error) => return Err(error),
- };
if created_pool {
let signal_result = self
.record_detection_signal(
@@ -2845,6 +2859,46 @@ fn kb_parse_payload_json(payload_json: &str) -> Result std::option::Option {
+ if index >= values.len() {
+ return None;
+ }
+ let value = &values[index];
+ let text_option = value.as_str();
+ let text = match text_option {
+ Some(text) => text.trim(),
+ None => return None,
+ };
+ if text.is_empty() {
+ return None;
+ }
+ Some(text.to_string())
+}
+
+fn kb_extract_pump_swap_vault_addresses(
+ payload_value: &serde_json::Value,
+) -> (
+ std::option::Option,
+ std::option::Option,
+) {
+ let accounts_option = payload_value.get("accounts");
+ let accounts = match accounts_option {
+ Some(accounts) => accounts,
+ None => return (None, None),
+ };
+ let accounts_array_option = accounts.as_array();
+ let accounts_array = match accounts_array_option {
+ Some(accounts_array) => accounts_array,
+ None => return (None, None),
+ };
+ let token_a_vault_address = kb_extract_string_from_array_index(accounts_array, 7);
+ let token_b_vault_address = kb_extract_string_from_array_index(accounts_array, 8);
+ (token_a_vault_address, token_b_vault_address)
+}
+
#[cfg(test)]
mod tests {
async fn make_database() -> std::sync::Arc {
diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs
index d3bfb45..cfa9532 100644
--- a/kb_lib/src/lib.rs
+++ b/kb_lib/src/lib.rs
@@ -180,6 +180,7 @@ pub use db::list_recent_onchain_observations;
pub use db::list_recent_swaps;
pub use db::list_recent_token_burn_events;
pub use db::list_recent_token_mint_events;
+pub use db::list_tokens;
pub use db::list_trade_events_by_pair_id;
pub use db::list_trade_events_by_transaction_id;
pub use db::list_wallet_holdings_by_wallet_id;
@@ -306,6 +307,7 @@ pub use pool_origin::KbPoolOriginService;
pub use solana_pubsub_ws::KbSolanaWsTypedNotification;
pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification;
pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event;
+pub use token_backfill::KbPoolBackfillResult;
pub use token_backfill::KbTokenBackfillResult;
pub use token_backfill::KbTokenBackfillService;
pub use tracing::KbTracingGuard;
diff --git a/kb_lib/src/token_backfill.rs b/kb_lib/src/token_backfill.rs
index 82ad245..b1cf941 100644
--- a/kb_lib/src/token_backfill.rs
+++ b/kb_lib/src/token_backfill.rs
@@ -33,6 +33,33 @@ pub struct KbTokenBackfillResult {
pub trade_event_count: usize,
}
+/// One pool-backfill result summary.
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct KbPoolBackfillResult {
+ /// Input pool address.
+ pub pool_address: std::string::String,
+ /// Number of signatures returned directly for the pool address.
+ pub pool_signature_count: usize,
+ /// Number of unique signatures processed during this run.
+ pub unique_signature_count: usize,
+ /// Number of transactions resolved through HTTP during this run.
+ pub resolved_transaction_count: usize,
+ /// Number of signatures whose `getTransaction` lookup returned `null`.
+ pub missing_transaction_count: usize,
+ /// Total number of decoded DEX events replayed during this run.
+ pub decoded_event_count: usize,
+ /// Total number of DEX detection results produced during this run.
+ pub detection_count: usize,
+ /// Total number of launch-attribution results produced during this run.
+ pub launch_attribution_count: usize,
+ /// Total number of pool-origin results produced during this run.
+ pub pool_origin_count: usize,
+ /// Total number of wallet-participation observations produced during this run.
+ pub wallet_participation_count: usize,
+ /// Total number of trade-aggregation results produced during this run.
+ pub trade_event_count: usize,
+}
+
/// Historical token backfill service.
///
/// This service reuses the existing transaction projection and downstream
@@ -310,11 +337,9 @@ impl KbTokenBackfillService {
trade_event_count: 0,
});
}
- let existing_transaction_result = crate::get_chain_transaction_by_signature(
- self.database.as_ref(),
- signature.as_str(),
- )
- .await;
+ let existing_transaction_result =
+ crate::get_chain_transaction_by_signature(self.database.as_ref(), signature.as_str())
+ .await;
let existing_transaction_option = match existing_transaction_result {
Ok(existing_transaction_option) => existing_transaction_option,
Err(error) => return Err(error),
@@ -391,6 +416,160 @@ impl KbTokenBackfillService {
trade_event_count: trade_aggregations.len(),
})
}
+
+ /// Replays the historical activity of one pool address through the existing pipeline.
+ pub async fn backfill_pool_by_address(
+ &self,
+ pool_address: &str,
+ pool_signature_limit: usize,
+ ) -> Result {
+ let effective_limit = if pool_signature_limit > 1000 {
+ 1000
+ } else {
+ pool_signature_limit
+ };
+ let mut result = crate::KbPoolBackfillResult {
+ pool_address: pool_address.to_string(),
+ pool_signature_count: 0,
+ unique_signature_count: 0,
+ resolved_transaction_count: 0,
+ missing_transaction_count: 0,
+ decoded_event_count: 0,
+ detection_count: 0,
+ launch_attribution_count: 0,
+ pool_origin_count: 0,
+ wallet_participation_count: 0,
+ trade_event_count: 0,
+ };
+ let mut seen_addresses = std::collections::BTreeSet::::new();
+ let mut addresses_to_scan = std::vec::Vec::::new();
+ let trimmed_pool_address = pool_address.trim().to_string();
+ if trimmed_pool_address.is_empty() {
+ return Err(crate::KbError::Config(
+ "pool_address must not be empty".to_string(),
+ ));
+ }
+ seen_addresses.insert(trimmed_pool_address.clone());
+ addresses_to_scan.push(trimmed_pool_address.clone());
+ let pool_result =
+ crate::get_pool_by_address(self.database.as_ref(), trimmed_pool_address.as_str()).await;
+ let pool_option = match pool_result {
+ Ok(pool_option) => pool_option,
+ Err(error) => return Err(error),
+ };
+ if let Some(pool) = pool_option {
+ let pool_id = match pool.id {
+ Some(pool_id) => pool_id,
+ None => {
+ return Err(crate::KbError::InvalidState(format!(
+ "pool '{}' has no internal id",
+ pool.address
+ )));
+ }
+ };
+ let pool_tokens_result =
+ crate::list_pool_tokens_by_pool_id(self.database.as_ref(), pool_id).await;
+ let pool_tokens = match pool_tokens_result {
+ Ok(pool_tokens) => pool_tokens,
+ Err(error) => return Err(error),
+ };
+ for pool_token in pool_tokens {
+ let vault_address_option = pool_token.vault_address.clone();
+ let vault_address = match vault_address_option {
+ Some(vault_address) => vault_address.trim().to_string(),
+ None => continue,
+ };
+ if vault_address.is_empty() {
+ continue;
+ }
+ if seen_addresses.contains(vault_address.as_str()) {
+ continue;
+ }
+ seen_addresses.insert(vault_address.clone());
+ addresses_to_scan.push(vault_address);
+ }
+ }
+ let mut seen_signatures = std::collections::HashSet::::new();
+ for address in &addresses_to_scan {
+ let signatures_result = self
+ .fetch_signatures_for_address(address.clone(), effective_limit)
+ .await;
+ let mut signatures = match signatures_result {
+ Ok(signatures) => signatures,
+ Err(error) => return Err(error),
+ };
+ if address == &trimmed_pool_address {
+ result.pool_signature_count = signatures.len();
+ }
+ signatures.reverse();
+ for signature_status in signatures {
+ let signature = signature_status.signature.clone();
+ if seen_signatures.contains(signature.as_str()) {
+ continue;
+ }
+ seen_signatures.insert(signature.clone());
+ result.unique_signature_count += 1;
+ let replay_result = self.replay_signature(signature).await;
+ let replay_result = match replay_result {
+ Ok(replay_result) => replay_result,
+ Err(error) => return Err(error),
+ };
+ result.resolved_transaction_count += replay_result.resolved_transaction_count;
+ result.missing_transaction_count += replay_result.missing_transaction_count;
+ result.decoded_event_count += replay_result.decoded_event_count;
+ result.detection_count += replay_result.detection_count;
+ result.launch_attribution_count += replay_result.launch_attribution_count;
+ result.pool_origin_count += replay_result.pool_origin_count;
+ result.wallet_participation_count += replay_result.wallet_participation_count;
+ result.trade_event_count += replay_result.trade_event_count;
+ }
+ }
+ let summary_payload = serde_json::json!({
+ "poolAddress": result.pool_address,
+ "poolSignatureCount": result.pool_signature_count,
+ "uniqueSignatureCount": result.unique_signature_count,
+ "resolvedTransactionCount": result.resolved_transaction_count,
+ "missingTransactionCount": result.missing_transaction_count,
+ "decodedEventCount": result.decoded_event_count,
+ "detectionCount": result.detection_count,
+ "launchAttributionCount": result.launch_attribution_count,
+ "poolOriginCount": result.pool_origin_count,
+ "walletParticipationCount": result.wallet_participation_count,
+ "tradeEventCount": result.trade_event_count,
+ "scannedAddressCount": addresses_to_scan.len(),
+ "effectiveSignatureLimit": effective_limit
+ });
+ let observation_result = self
+ .persistence
+ .record_observation(&crate::KbDetectionObservationInput::new(
+ "pool.backfill.completed".to_string(),
+ crate::KbObservationSourceKind::HttpRpc,
+ Some(format!("backfill:{}", self.http_role)),
+ pool_address.to_string(),
+ None,
+ summary_payload.clone(),
+ ))
+ .await;
+ let observation_id = match observation_result {
+ Ok(observation_id) => observation_id,
+ Err(error) => return Err(error),
+ };
+ let signal_result = self
+ .persistence
+ .record_signal(&crate::KbDetectionSignalInput::new(
+ "signal.pool.backfill.completed".to_string(),
+ crate::KbAnalysisSignalSeverity::Low,
+ pool_address.to_string(),
+ Some(observation_id),
+ None,
+ summary_payload,
+ ))
+ .await;
+ if let Err(error) = signal_result {
+ return Err(error);
+ }
+ Ok(result)
+ }
}
#[derive(Debug, Clone, Default)]
diff --git a/kb_lib/src/trade_aggregation.rs b/kb_lib/src/trade_aggregation.rs
index f4e5d9e..7d9e2f5 100644
--- a/kb_lib/src/trade_aggregation.rs
+++ b/kb_lib/src/trade_aggregation.rs
@@ -74,7 +74,7 @@ impl KbTradeAggregationService {
};
let mut results = std::vec::Vec::new();
for decoded_event in &decoded_events {
- if !decoded_event.event_kind.ends_with(".swap") {
+ if !kb_is_trade_event_kind(decoded_event.event_kind.as_str()) {
continue;
}
let decoded_event_id = match decoded_event.id {
@@ -135,6 +135,16 @@ impl KbTradeAggregationService {
)));
}
};
+ let pool_tokens_result =
+ crate::list_pool_tokens_by_pool_id(self.database.as_ref(), pool_id).await;
+ let pool_tokens = match pool_tokens_result {
+ Ok(pool_tokens) => pool_tokens,
+ Err(error) => return Err(error),
+ };
+ let base_vault_address =
+ kb_find_pool_token_vault_address_by_token_id(&pool_tokens, pair.base_token_id);
+ let quote_vault_address =
+ kb_find_pool_token_vault_address_by_token_id(&pool_tokens, pair.quote_token_id);
let payload_result =
serde_json::from_str::(decoded_event.payload_json.as_str());
let payload = match payload_result {
@@ -146,8 +156,8 @@ impl KbTradeAggregationService {
)));
}
};
- let trade_side = kb_extract_trade_side(&payload);
- let base_amount_raw = kb_extract_amount_string(
+ let trade_side = kb_extract_trade_side(decoded_event.event_kind.as_str(), &payload);
+ let mut base_amount_raw = kb_extract_amount_string(
&payload,
&[
"baseAmountRaw",
@@ -157,7 +167,7 @@ impl KbTradeAggregationService {
"amountInBase",
],
);
- let quote_amount_raw = kb_extract_amount_string(
+ let mut quote_amount_raw = kb_extract_amount_string(
&payload,
&[
"quoteAmountRaw",
@@ -167,45 +177,85 @@ impl KbTradeAggregationService {
"amountOutQuote",
],
);
- let price_quote_per_base =
- kb_compute_price_quote_per_base(base_amount_raw.clone(), quote_amount_raw.clone());
- let slot_i64 = kb_convert_slot_to_i64(transaction.slot);
- let created_trade_event = existing_trade_option.is_none();
- let trade_event_id = if let Some(existing_trade) = existing_trade_option {
- match existing_trade.id {
- Some(trade_event_id) => trade_event_id,
- None => {
- return Err(crate::KbError::InvalidState(
- "trade event has no internal id".to_string(),
- ));
- }
- }
- } else {
- let trade_event_dto = crate::KbTradeEventDto::new(
- pool.dex_id,
- pool_id,
- pair_id,
- transaction_id,
- decoded_event_id,
- transaction.signature.clone(),
- slot_i64,
- trade_side,
- pair.base_token_id,
- pair.quote_token_id,
- base_amount_raw.clone(),
- quote_amount_raw.clone(),
- price_quote_per_base,
- crate::KbObservationSourceKind::Dex,
- transaction.source_endpoint_name.clone(),
- decoded_event.payload_json.clone(),
+ let mut price_quote_per_base = None;
+ if decoded_event.event_kind.starts_with("pump_swap.")
+ && (base_amount_raw.is_none()
+ || quote_amount_raw.is_none()
+ || price_quote_per_base.is_none())
+ {
+ let inferred_result = kb_extract_pump_swap_amounts_from_transaction(
+ transaction.transaction_json.as_str(),
+ transaction.meta_json.as_deref(),
+ base_vault_address.as_deref(),
+ quote_vault_address.as_deref(),
);
- let upsert_result =
- crate::upsert_trade_event(self.database.as_ref(), &trade_event_dto).await;
- match upsert_result {
- Ok(trade_event_id) => trade_event_id,
+ let inferred = match inferred_result {
+ Ok(inferred) => inferred,
Err(error) => return Err(error),
+ };
+ if base_amount_raw.is_none() {
+ base_amount_raw = inferred.0;
}
+ if quote_amount_raw.is_none() {
+ quote_amount_raw = inferred.1;
+ }
+ if price_quote_per_base.is_none() {
+ price_quote_per_base = inferred.2;
+ }
+ }
+ if price_quote_per_base.is_none() {
+ price_quote_per_base = kb_compute_price_quote_per_base_with_decimals(
+ transaction.meta_json.as_deref(),
+ transaction.transaction_json.as_str(),
+ base_vault_address.as_deref(),
+ quote_vault_address.as_deref(),
+ );
+ }
+ let slot_i64 = kb_convert_slot_to_i64(transaction.slot);
+ let existing_trade_was_empty = match &existing_trade_option {
+ Some(existing_trade) => {
+ existing_trade.base_amount_raw.is_none()
+ && existing_trade.quote_amount_raw.is_none()
+ && existing_trade.price_quote_per_base.is_none()
+ }
+ None => false,
};
+ let trade_event_dto = crate::KbTradeEventDto::new(
+ pool.dex_id,
+ pool_id,
+ pair_id,
+ transaction_id,
+ decoded_event_id,
+ transaction.signature.clone(),
+ slot_i64,
+ trade_side,
+ pair.base_token_id,
+ pair.quote_token_id,
+ base_amount_raw.clone(),
+ quote_amount_raw.clone(),
+ price_quote_per_base,
+ crate::KbObservationSourceKind::Dex,
+ transaction.source_endpoint_name.clone(),
+ decoded_event.payload_json.clone(),
+ );
+ tracing::debug!(
+ event_kind = %decoded_event.event_kind,
+ pool_account = ?decoded_event.pool_account,
+ decoded_event_id = ?decoded_event.id,
+ "trade aggregation candidate"
+ );
+ let upsert_result =
+ crate::upsert_trade_event(self.database.as_ref(), &trade_event_dto).await;
+ let trade_event_id = match upsert_result {
+ Ok(trade_event_id) => trade_event_id,
+ Err(error) => return Err(error),
+ };
+ let created_trade_event = existing_trade_option.is_none();
+ let repaired_trade_event = !created_trade_event
+ && existing_trade_was_empty
+ && (base_amount_raw.is_some()
+ || quote_amount_raw.is_some()
+ || price_quote_per_base.is_some());
let pair_metric_result =
crate::get_pair_metric_by_pair_id(self.database.as_ref(), pair_id).await;
let pair_metric_option = match pair_metric_result {
@@ -221,7 +271,7 @@ impl KbTradeAggregationService {
));
}
};
- if created_trade_event {
+ if created_trade_event || repaired_trade_event {
let mut updated_metric = existing_metric.clone();
kb_apply_trade_to_pair_metric(
&mut updated_metric,
@@ -310,6 +360,19 @@ impl KbTradeAggregationService {
}
}
+fn kb_is_trade_event_kind(event_kind: &str) -> bool {
+ if event_kind.ends_with(".swap") {
+ return true;
+ }
+ if event_kind.ends_with(".buy") {
+ return true;
+ }
+ if event_kind.ends_with(".sell") {
+ return true;
+ }
+ false
+}
+
fn kb_convert_slot_to_i64(slot: std::option::Option) -> std::option::Option {
match slot {
Some(slot) => match i64::try_from(slot) {
@@ -320,13 +383,20 @@ fn kb_convert_slot_to_i64(slot: std::option::Option) -> std::option::Option
}
}
-fn kb_extract_trade_side(payload: &serde_json::Value) -> crate::KbSwapTradeSide {
+fn kb_extract_trade_side(event_kind: &str, payload: &serde_json::Value) -> crate::KbSwapTradeSide {
let trade_side_option = kb_extract_string_by_candidate_keys(payload, &["tradeSide"]);
match trade_side_option.as_deref() {
- Some("BuyBase") => crate::KbSwapTradeSide::BuyBase,
- Some("SellBase") => crate::KbSwapTradeSide::SellBase,
- _ => crate::KbSwapTradeSide::Unknown,
+ Some("BuyBase") => return crate::KbSwapTradeSide::BuyBase,
+ Some("SellBase") => return crate::KbSwapTradeSide::SellBase,
+ _ => {}
}
+ if event_kind.ends_with(".buy") {
+ return crate::KbSwapTradeSide::BuyBase;
+ }
+ if event_kind.ends_with(".sell") {
+ return crate::KbSwapTradeSide::SellBase;
+ }
+ crate::KbSwapTradeSide::Unknown
}
fn kb_extract_amount_string(
@@ -336,34 +406,6 @@ fn kb_extract_amount_string(
kb_extract_scalar_as_string_by_candidate_keys(payload, candidate_keys)
}
-fn kb_compute_price_quote_per_base(
- base_amount_raw: std::option::Option,
- quote_amount_raw: std::option::Option,
-) -> std::option::Option {
- let base_amount_text = match base_amount_raw {
- Some(base_amount_text) => base_amount_text,
- None => return None,
- };
- let quote_amount_text = match quote_amount_raw {
- Some(quote_amount_text) => quote_amount_text,
- None => return None,
- };
- let base_amount_result = base_amount_text.parse::();
- let base_amount = match base_amount_result {
- Ok(base_amount) => base_amount,
- Err(_) => return None,
- };
- if base_amount <= 0.0 {
- return None;
- }
- let quote_amount_result = quote_amount_text.parse::();
- let quote_amount = match quote_amount_result {
- Ok(quote_amount) => quote_amount,
- Err(_) => return None,
- };
- Some(quote_amount / base_amount)
-}
-
fn kb_apply_trade_to_pair_metric(
metric: &mut crate::KbPairMetricDto,
slot: std::option::Option,
@@ -495,10 +537,292 @@ fn kb_extract_scalar_as_string_by_candidate_keys(
}
}
}
-
None
}
+fn kb_find_pool_token_vault_address_by_token_id(
+ pool_tokens: &[crate::KbPoolTokenDto],
+ token_id: i64,
+) -> std::option::Option {
+ for pool_token in pool_tokens {
+ if pool_token.token_id != token_id {
+ continue;
+ }
+ let vault_address_option = pool_token.vault_address.clone();
+ let vault_address = match vault_address_option {
+ Some(vault_address) => vault_address.trim().to_string(),
+ None => continue,
+ };
+ if vault_address.is_empty() {
+ continue;
+ }
+ return Some(vault_address);
+ }
+ None
+}
+
+fn kb_extract_pump_swap_amounts_from_transaction(
+ transaction_json: &str,
+ meta_json: std::option::Option<&str>,
+ base_vault_address: std::option::Option<&str>,
+ quote_vault_address: std::option::Option<&str>,
+) -> Result<
+ (
+ std::option::Option,
+ std::option::Option,
+ std::option::Option,
+ ),
+ crate::KbError,
+> {
+ let meta_json = match meta_json {
+ Some(meta_json) => meta_json,
+ None => return Ok((None, None, None)),
+ };
+ let transaction_value_result = serde_json::from_str::(transaction_json);
+ let transaction_value = match transaction_value_result {
+ Ok(transaction_value) => transaction_value,
+ Err(error) => {
+ return Err(crate::KbError::Json(format!(
+ "cannot parse transaction_json for pump_swap amount extraction: {}",
+ error
+ )));
+ }
+ };
+ let meta_value_result = serde_json::from_str::(meta_json);
+ let meta_value = match meta_value_result {
+ Ok(meta_value) => meta_value,
+ Err(error) => {
+ return Err(crate::KbError::Json(format!(
+ "cannot parse meta_json for pump_swap amount extraction: {}",
+ error
+ )));
+ }
+ };
+ let account_keys_result = kb_extract_transaction_account_keys(&transaction_value);
+ let account_keys = match account_keys_result {
+ Ok(account_keys) => account_keys,
+ Err(error) => return Err(error),
+ };
+ let pre_balances_result =
+ kb_extract_token_balance_map(&meta_value, &account_keys, "preTokenBalances");
+ let pre_balances = match pre_balances_result {
+ Ok(pre_balances) => pre_balances,
+ Err(error) => return Err(error),
+ };
+ let post_balances_result =
+ kb_extract_token_balance_map(&meta_value, &account_keys, "postTokenBalances");
+ let post_balances = match post_balances_result {
+ Ok(post_balances) => post_balances,
+ Err(error) => return Err(error),
+ };
+ let mut base_amount_raw = None;
+ let mut quote_amount_raw = None;
+ let mut price_quote_per_base = None;
+ if let Some(base_vault_address) = base_vault_address {
+ let base_pre = pre_balances.get(base_vault_address);
+ let base_post = post_balances.get(base_vault_address);
+ let base_pre_raw = base_pre.map(|value| value.0.clone());
+ let base_post_raw = base_post.map(|value| value.0.clone());
+ base_amount_raw = kb_compute_amount_delta_abs(base_pre_raw, base_post_raw);
+ let base_pre_ui = base_pre.and_then(|value| value.1);
+ let base_post_ui = base_post.and_then(|value| value.1);
+ let base_delta_ui = kb_compute_ui_delta_abs(base_pre_ui, base_post_ui);
+ if let Some(quote_vault_address) = quote_vault_address {
+ let quote_pre = pre_balances.get(quote_vault_address);
+ let quote_post = post_balances.get(quote_vault_address);
+ let quote_pre_raw = quote_pre.map(|value| value.0.clone());
+ let quote_post_raw = quote_post.map(|value| value.0.clone());
+ quote_amount_raw = kb_compute_amount_delta_abs(quote_pre_raw, quote_post_raw);
+ let quote_pre_ui = quote_pre.and_then(|value| value.1);
+ let quote_post_ui = quote_post.and_then(|value| value.1);
+ let quote_delta_ui = kb_compute_ui_delta_abs(quote_pre_ui, quote_post_ui);
+ match (base_delta_ui, quote_delta_ui) {
+ (Some(base_delta_ui), Some(quote_delta_ui)) => {
+ if base_delta_ui > 0.0 {
+ price_quote_per_base = Some(quote_delta_ui / base_delta_ui);
+ }
+ }
+ _ => {}
+ }
+ }
+ }
+ Ok((base_amount_raw, quote_amount_raw, price_quote_per_base))
+}
+
+fn kb_extract_transaction_account_keys(
+ transaction_value: &serde_json::Value,
+) -> Result, crate::KbError> {
+ let candidate_arrays = [
+ transaction_value
+ .get("message")
+ .and_then(|value| value.get("accountKeys")),
+ transaction_value
+ .get("transaction")
+ .and_then(|value| value.get("message"))
+ .and_then(|value| value.get("accountKeys")),
+ transaction_value.get("accountKeys"),
+ ];
+ for candidate_array_option in candidate_arrays {
+ let candidate_array = match candidate_array_option {
+ Some(candidate_array) => candidate_array,
+ None => continue,
+ };
+ let array = match candidate_array.as_array() {
+ Some(array) => array,
+ None => continue,
+ };
+ let mut account_keys = std::vec::Vec::new();
+ for item in array {
+ if let Some(value) = item.as_str() {
+ account_keys.push(value.to_string());
+ continue;
+ }
+ let pubkey_option = item.get("pubkey").and_then(|value| value.as_str());
+ if let Some(pubkey) = pubkey_option {
+ account_keys.push(pubkey.to_string());
+ continue;
+ }
+ }
+ if !account_keys.is_empty() {
+ return Ok(account_keys);
+ }
+ }
+ Err(crate::KbError::Json(
+ "cannot extract accountKeys from transaction_json".to_string(),
+ ))
+}
+
+fn kb_extract_token_balance_map(
+ meta_value: &serde_json::Value,
+ account_keys: &[std::string::String],
+ field_name: &str,
+) -> Result<
+ std::collections::BTreeMap<
+ std::string::String,
+ (std::string::String, std::option::Option),
+ >,
+ crate::KbError,
+> {
+ let mut result = std::collections::BTreeMap::<
+ std::string::String,
+ (std::string::String, std::option::Option),
+ >::new();
+ let balances_option = meta_value
+ .get(field_name)
+ .and_then(|value| value.as_array());
+ let balances = match balances_option {
+ Some(balances) => balances,
+ None => return Ok(result),
+ };
+ for balance in balances {
+ let account_index_option = balance.get("accountIndex").and_then(|value| value.as_u64());
+ let account_index = match account_index_option {
+ Some(account_index) => account_index as usize,
+ None => continue,
+ };
+ if account_index >= account_keys.len() {
+ continue;
+ }
+ let account_address = account_keys[account_index].clone();
+ let ui_token_amount_option = balance.get("uiTokenAmount");
+ let ui_token_amount = match ui_token_amount_option {
+ Some(ui_token_amount) => ui_token_amount,
+ None => continue,
+ };
+ let raw_amount_option = ui_token_amount
+ .get("amount")
+ .and_then(|value| value.as_str());
+ let raw_amount = match raw_amount_option {
+ Some(raw_amount) => raw_amount.to_string(),
+ None => continue,
+ };
+ let ui_amount_string_option = ui_token_amount
+ .get("uiAmountString")
+ .and_then(|value| value.as_str());
+ let ui_amount = match ui_amount_string_option {
+ Some(ui_amount_string) => {
+ let parse_result = ui_amount_string.parse::();
+ match parse_result {
+ Ok(ui_amount) => Some(ui_amount),
+ Err(_) => None,
+ }
+ }
+ None => None,
+ };
+ result.insert(account_address, (raw_amount, ui_amount));
+ }
+ Ok(result)
+}
+
+fn kb_compute_amount_delta_abs(
+ pre_amount: std::option::Option,
+ post_amount: std::option::Option,
+) -> std::option::Option {
+ let pre_amount = match pre_amount {
+ Some(pre_amount) => pre_amount,
+ None => "0".to_string(),
+ };
+ let post_amount = match post_amount {
+ Some(post_amount) => post_amount,
+ None => "0".to_string(),
+ };
+ let pre_value_result = pre_amount.parse::();
+ let pre_value = match pre_value_result {
+ Ok(pre_value) => pre_value,
+ Err(_) => return None,
+ };
+ let post_value_result = post_amount.parse::();
+ let post_value = match post_value_result {
+ Ok(post_value) => post_value,
+ Err(_) => return None,
+ };
+ let delta = if post_value >= pre_value {
+ post_value - pre_value
+ } else {
+ pre_value - post_value
+ };
+ Some(delta.to_string())
+}
+
+fn kb_compute_ui_delta_abs(
+ pre_amount: std::option::Option,
+ post_amount: std::option::Option,
+) -> std::option::Option {
+ let pre_amount = match pre_amount {
+ Some(pre_amount) => pre_amount,
+ None => 0.0,
+ };
+ let post_amount = match post_amount {
+ Some(post_amount) => post_amount,
+ None => 0.0,
+ };
+ let delta = if post_amount >= pre_amount {
+ post_amount - pre_amount
+ } else {
+ pre_amount - post_amount
+ };
+ Some(delta)
+}
+
+fn kb_compute_price_quote_per_base_with_decimals(
+ meta_json: std::option::Option<&str>,
+ transaction_json: &str,
+ base_vault_address: std::option::Option<&str>,
+ quote_vault_address: std::option::Option<&str>,
+) -> std::option::Option {
+ let inferred_result = kb_extract_pump_swap_amounts_from_transaction(
+ transaction_json,
+ meta_json,
+ base_vault_address,
+ quote_vault_address,
+ );
+ let inferred = match inferred_result {
+ Ok(inferred) => inferred,
+ Err(_) => return None,
+ };
+ inferred.2
+}
+
#[cfg(test)]
mod tests {
async fn make_database() -> std::sync::Arc {
diff --git a/khadhroony-bobobot-v0.7.24-pre.1.zip b/khadhroony-bobobot-v0.7.24-pre.1.zip
new file mode 100644
index 0000000..5708ff3
Binary files /dev/null and b/khadhroony-bobobot-v0.7.24-pre.1.zip differ