diff --git a/CHANGELOG.md b/CHANGELOG.md index 8161508..3bc3288 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,3 +29,4 @@ 0.6.3 - Enrichissement des notifications WebSocket utiles : extraction améliorée de pubkey, signature, owner, parsed account type et slot pour account/logs/signature notifications 0.6.4 - Premières règles de détection technique pour candidats pools/listings depuis programNotification en s’appuyant sur les DEX connus en base 0.6.5 - Ajout de ws_manager.rs pour l’orchestration multi-clients WebSocket, le bus d’événements unifié et le branchement centralisé du relais de détection +0.6.6 - Ajout de la fenêtre Demo Ws Manager dans kb_app pour piloter plusieurs WsClient, visualiser le snapshot consolidé, tester le démarrage/arrêt par rôle et valider le flux unifié de WsEvent diff --git a/Cargo.toml b/Cargo.toml index 5d20d1e..f88990e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.6.5" +version = "0.6.6" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index 2d0a8b1..0d379e3 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -436,7 +436,18 @@ Réalisé : - branchement optionnel du relais de détection WS sur tous les clients orchestrés, - préparation des futures politiques de répartition, supervision et reconnexion. -### 6.031. Version `0.7.0` — Résolution transactionnelle orientée DEX +### 6.031. Version `0.6.6` — Démo légère `WsManager` dans `kb_app` +Réalisé : + +- ajout d’une fenêtre `Demo Ws Manager` dans `kb_app`, +- ouverture depuis la fenêtre principale, +- affichage du snapshot consolidé du `WsManager`, +- pilotage des endpoints WS gérés via `start/stop all` et `start/stop role`, +- visualisation du flux unifié de `WsEvent`, +- validation UI du branchement centralisé du relais de détection, +- amélioration des messages de log UI pour les actions idempotentes déjà démarrées ou déjà arrêtées. + +### 6.032. Version `0.7.0` — Résolution transactionnelle orientée DEX Objectif : relier la détection temps réel aux transactions Solana complètes. À faire : @@ -446,7 +457,7 @@ Objectif : relier la détection temps réel aux transactions Solana complètes. - utiliser le pool HTTP existant pour enrichir les signaux détectés côté WS, - éviter qu’une notification intéressante reste au niveau d’un simple signal technique sans résolution métier. -### 6.032. Version `0.7.1` — Modèle transactionnel Solana enrichi +### 6.033. Version `0.7.1` — Modèle transactionnel Solana enrichi Objectif : préparer un modèle interne plus riche, inspiré d’une vision `slot -> signature -> instruction`. À faire : @@ -456,7 +467,7 @@ Objectif : préparer un modèle interne plus riche, inspiré d’une vision `slo - conserver la possibilité de relier plus tard un pool, un token ou un wallet à une signature fondatrice, - préparer l’historique transactionnel nécessaire aux futurs décodeurs DEX. -### 6.033. Version `0.7.2` — Décodeurs DEX spécifiques par programme et version +### 6.034. Version `0.7.2` — Décodeurs DEX spécifiques par programme et version Objectif : remplacer les heuristiques ponctuelles par de vrais décodeurs Rust dédiés. À faire : @@ -466,7 +477,7 @@ Objectif : remplacer les heuristiques ponctuelles par de vrais décodeurs Rust d - encapsuler les index de comptes et les motifs de logs propres à chaque protocole, - prévoir des décodeurs séparés au minimum pour Raydium, Pump.fun / PumpSwap, Meteora, puis les autres cibles. -### 6.034. Version `0.7.3` — Détection des nouveaux pools et paires via logs + transaction +### 6.035. Version `0.7.3` — Détection des nouveaux pools et paires via logs + transaction Objectif : détecter rapidement les nouvelles paires/pools à partir des flux RPC et des transactions enrichies. À faire : @@ -476,7 +487,7 @@ Objectif : détecter rapidement les nouvelles paires/pools à partir des flux RP - extraire token A, token B, LP mint, vaults et comptes utiles quand cela est possible, - alimenter `kb_pools`, `kb_pairs`, `kb_pool_tokens` et `kb_pool_listings` avec des données plus fiables que la seule détection de comptes. -### 6.035. Version `0.7.4` — Modèle métier DEX enrichi +### 6.036. Version `0.7.4` — Modèle métier DEX enrichi Objectif : faire converger la détection technique et le modèle métier vers une vision proche de l’activité réelle du marché. À faire : @@ -486,7 +497,7 @@ Objectif : faire converger la détection technique et le modèle métier vers un - préparer une vision cohérente `token <-> pool <-> pair <-> protocole`, - distinguer les objets de référence des événements d’activité. -### 6.036. Version `0.7.5` — Wallets, holdings et participants observés +### 6.037. Version `0.7.5` — Wallets, holdings et participants observés Objectif : préparer le suivi des acteurs on-chain autour des pools et tokens détectés. À faire : @@ -496,7 +507,7 @@ Objectif : préparer le suivi des acteurs on-chain autour des pools et tokens d - préparer l’identification des créateurs, mint authorities, wallets d’activité et contreparties, - éviter de limiter l’analyse future au seul niveau token/pool sans vision des participants. -### 6.037. Version `0.7.6` — Séries de prix, volumes et agrégats DEX +### 6.038. Version `0.7.6` — Séries de prix, volumes et agrégats DEX Objectif : préparer la couche analytique fine à partir des événements métier normalisés. À faire : @@ -506,7 +517,7 @@ Objectif : préparer la couche analytique fine à partir des événements métie - permettre plus tard le calcul d’OHLCV, volume, nombre de trades et liquidité par fenêtre, - préparer le terrain pour la couche analytique `0.8.x`. -### 6.038. Version `0.7.x` — DEX connectors v1 +### 6.039. Version `0.7.x` — DEX connectors v1 Objectif : structurer les connecteurs DEX autour d’un pipeline complet de résolution, décodage et normalisation métier. Cibles initiales possibles : @@ -526,7 +537,7 @@ Résultat attendu : - création d’objets métier riches pour tokens, pools, paires, wallets, holdings et séries de prix, - remplacement progressif des scripts heuristiques externes par des composants Rust intégrés. -### 6.039. Version `0.8.x` — Analyse et filtrage +### 6.040. Version `0.8.x` — Analyse et filtrage Objectif : transformer les événements bruts en signaux exploitables. À faire : @@ -537,7 +548,7 @@ Objectif : transformer les événements bruts en signaux exploitables. - statistiques de comportement, - premiers patterns. -### 6.040. Version `1.x.y` — Wallets et swap préparatoire +### 6.041. Version `1.x.y` — Wallets et swap préparatoire Objectif : préparer la couche d’action. À faire : @@ -548,7 +559,7 @@ Objectif : préparer la couche d’action. - préparation d’ordres et de swaps, - simulation et garde-fous. -### 6.041. Version `2.x.y` — Trading semi-automatisé +### 6.042. Version `2.x.y` — Trading semi-automatisé Objectif : brancher l’analyse à l’action tout en gardant des garde-fous explicites. À faire : @@ -559,7 +570,7 @@ Objectif : brancher l’analyse à l’action tout en gardant des garde-fous exp - confirmations explicites ou semi-automatiques, - journaux d’exécution. -### 6.042. Version `3.x.y` — Yellowstone gRPC +### 6.043. Version `3.x.y` — Yellowstone gRPC Objectif : ajouter le connecteur gRPC dédié. À faire : @@ -580,10 +591,12 @@ Modules cibles à court terme : - `constants.rs` - `types.rs` - `ws_client.rs` +- `ws_manager.rs` - `http_client.rs` -- `rpc_json.rs` -- `rpc_ws.rs` -- `rpc_ws_solana.rs` +- `http_pool.rs` +- `json_rpc_ws.rs` +- `solana_pubsub_ws.rs` +- `detect.rs` ### 7.2. `kb_app` Responsabilités cibles : @@ -644,9 +657,10 @@ Le projet doit maintenir au minimum : ## 12. Priorité immédiate La priorité immédiate est désormais la suivante : -1. démarrer la version `0.6.3` avec l’enrichissement des notifications WS utiles, -2. améliorer l’extraction des métadonnées utiles depuis `accountNotification`, `logsNotification` et `signatureNotification`, -3. produire des observations on-chain plus précises et homogènes, -4. préparer ensuite la version `0.6.4` pour les premières règles de détection technique, -5. conserver le découplage entre transport, détection et stockage, -6. planifier enfin `0.6.5` pour l’orchestration multi-clients WebSocket via `ws_pool.rs` ou `ws_manager.rs`. +1. démarrer la version `0.7.0` avec la résolution transactionnelle orientée DEX, +2. introduire une file de signatures ou de résolutions alimentée par les flux WS utiles, +3. corréler `logsNotification`, `programNotification` et `signatureNotification` avec des appels `getTransaction`, +4. utiliser le pool HTTP existant pour enrichir les signaux détectés côté WS, +5. préparer ensuite la version `0.7.1` pour le modèle transactionnel Solana enrichi, +6. conserver le découplage entre transport, résolution transactionnelle, détection métier et stockage. + diff --git a/kb_app/capabilities/default.json b/kb_app/capabilities/default.json index 8f272a1..2d822f0 100644 --- a/kb_app/capabilities/default.json +++ b/kb_app/capabilities/default.json @@ -6,7 +6,8 @@ "main", "splash", "demo_ws", - "demo_http" + "demo_http", + "demo_ws_manager" ], "permissions": [ "core:default", diff --git a/kb_app/frontend/demo_ws_manager.html b/kb_app/frontend/demo_ws_manager.html new file mode 100644 index 0000000..9f0af67 --- /dev/null +++ b/kb_app/frontend/demo_ws_manager.html @@ -0,0 +1,109 @@ + + + + + + + + Khadhroony-BoBoBot — Demo Ws Manager + + + + +
+ +
+ +
+
+
+
+
+
+
+

Pilotage

+

+ Démo légère du WsManager : démarrage/arrêt groupé, pilotage par rôle et bus unifié d’événements. +

+ +
+ + + +
+ +
+ + +
+ +
+ + +
+ +
+
Managed endpoints: 0
+
Started endpoints: 0
+
+
+
+
+ +
+
+
+

Snapshot

+
+ + + + + + + + + + + +
EndpointProviderRolesStateSubs.
+
+
+
+ +
+
+
+

Unified event log

+ +
+ +
+
+
+
+
+
+
+ + + + + + + diff --git a/kb_app/frontend/main.html b/kb_app/frontend/main.html index f5993a1..48cffa0 100644 --- a/kb_app/frontend/main.html +++ b/kb_app/frontend/main.html @@ -25,6 +25,9 @@ + @@ -54,6 +57,10 @@ Les tests PRC Http manuels sont disponibles dans la fenêtre dédiée Demo Http.

+

+ La démonstration légère de pilotage multi-clients est disponible dans la fenêtre + Demo Ws Manager. +

+

diff --git a/kb_app/frontend/ts/demo_http.ts b/kb_app/frontend/ts/demo_http.ts index 03ad29b..0b20105 100644 --- a/kb_app/frontend/ts/demo_http.ts +++ b/kb_app/frontend/ts/demo_http.ts @@ -268,6 +268,8 @@ async function refreshPoolSnapshot( document.addEventListener("DOMContentLoaded", async () => { void takeoverConsole(); + debug("demo_http window loaded"); + const sidebarToggle = document.querySelector('#sidebarToggle'); if (sidebarToggle) { // restaurer l’état depuis localStorage @@ -432,6 +434,4 @@ document.addEventListener("DOMContentLoaded", async () => { appendLogLine(logTextarea, "[ui] demo_http window loaded"); await refreshPoolSnapshot(poolTableBody, logTextarea, true); - - debug("demo_http window loaded"); }); \ No newline at end of file diff --git a/kb_app/frontend/ts/demo_ws.ts b/kb_app/frontend/ts/demo_ws.ts index 7124e53..459a799 100644 --- a/kb_app/frontend/ts/demo_ws.ts +++ b/kb_app/frontend/ts/demo_ws.ts @@ -240,6 +240,9 @@ function applyStatusToUi( document.addEventListener("DOMContentLoaded", async () => { void takeoverConsole(); + + debug("demo_ws window loaded"); + const sidebarToggle = document.querySelector('#sidebarToggle'); if (sidebarToggle) { // restaurer l’état depuis localStorage @@ -514,6 +517,4 @@ document.addEventListener("DOMContentLoaded", async () => { unlistenStatusEvent(); } }); - - debug("demo_ws window loaded"); }); \ No newline at end of file diff --git a/kb_app/frontend/ts/demo_ws_manager.ts b/kb_app/frontend/ts/demo_ws_manager.ts new file mode 100644 index 0000000..4c03587 --- /dev/null +++ b/kb_app/frontend/ts/demo_ws_manager.ts @@ -0,0 +1,237 @@ +// file: kb_app/frontend/ts/demo_ws_manager.ts + +import * as bootstrap from "bootstrap"; +import "simplebar"; +import ResizeObserver from "resize-observer-polyfill"; +import { invoke } from "@tauri-apps/api/core"; +import { listen } from "@tauri-apps/api/event"; +import { debug, takeoverConsole } from "@fltsci/tauri-plugin-tracing"; + +(window as Window & typeof globalThis & { bootstrap?: typeof bootstrap }).bootstrap = bootstrap; +(window as Window & typeof globalThis & { ResizeObserver?: typeof ResizeObserver }).ResizeObserver = ResizeObserver; + +type DemoWsManagerEndpointSummary = { + name: string; + resolvedUrl: string; + provider: string; + roles: string[]; + connectionState: string; + activeSubscriptionCount: number; +}; + +type DemoWsManagerSnapshotPayload = { + endpointCount: number; + startedCount: number; + endpoints: DemoWsManagerEndpointSummary[]; +}; + +const endpointCountText = document.querySelector("#demoWsManagerEndpointCountText"); +const startedCountText = document.querySelector("#demoWsManagerStartedCountText"); +const roleSelect = document.querySelector("#demoWsManagerRoleSelect"); +const tableBody = document.querySelector("#demoWsManagerTableBody"); +const logTextarea = document.querySelector("#demoWsManagerLogTextarea"); +const startAllButton = document.querySelector("#demoWsManagerStartAllButton"); +const stopAllButton = document.querySelector("#demoWsManagerStopAllButton"); +const refreshButton = document.querySelector("#demoWsManagerRefreshButton"); +const startRoleButton = document.querySelector("#demoWsManagerStartRoleButton"); +const stopRoleButton = document.querySelector("#demoWsManagerStopRoleButton"); +const clearLogButton = document.querySelector("#demoWsManagerClearLogButton"); + +function appendLogLine(line: string): void { + if (!logTextarea) { + return; + } + const prefix = logTextarea.value.length > 0 ? "\n" : ""; + logTextarea.value += `${prefix}${line}`; + logTextarea.scrollTop = logTextarea.scrollHeight; +} + +function renderSnapshot(snapshot: DemoWsManagerSnapshotPayload): void { + if (endpointCountText) { + endpointCountText.textContent = String(snapshot.endpointCount); + } + if (startedCountText) { + startedCountText.textContent = String(snapshot.startedCount); + } + if (!tableBody) { + return; + } + + tableBody.innerHTML = ""; + + for (const endpoint of snapshot.endpoints) { + const row = document.createElement("tr"); + row.innerHTML = ` + +
${endpoint.name}
+
${endpoint.resolvedUrl}
+ + ${endpoint.provider} + ${endpoint.roles.join(", ")} + ${endpoint.connectionState} + ${endpoint.activeSubscriptionCount} + `; + tableBody.appendChild(row); + } +} + +async function refreshSnapshot(): Promise { + try { + const snapshot = await invoke("demo_ws_manager_get_snapshot"); + renderSnapshot(snapshot); + appendLogLine("[ui] refreshed manager snapshot"); + } catch (error) { + appendLogLine(`[ui] snapshot error: ${String(error)}`); + } +} + +async function loadRoles(): Promise { + if (!roleSelect) { + return; + } + + roleSelect.innerHTML = ""; + + try { + const roles = await invoke("demo_ws_manager_list_roles"); + for (const role of roles) { + const option = document.createElement("option"); + option.value = role; + option.textContent = role; + roleSelect.appendChild(option); + } + } catch (error) { + appendLogLine(`[ui] list roles error: ${String(error)}`); + } +} + +async function startAll(): Promise { + try { + const snapshot = await invoke("demo_ws_manager_start_all"); + renderSnapshot(snapshot); + } catch (error) { + appendLogLine(`[ui] start all error: ${String(error)}`); + } +} + +async function stopAll(): Promise { + try { + const snapshot = await invoke("demo_ws_manager_stop_all"); + renderSnapshot(snapshot); + } catch (error) { + appendLogLine(`[ui] stop all error: ${String(error)}`); + } +} + +async function startRole(): Promise { + if (!roleSelect || roleSelect.value.trim().length === 0) { + appendLogLine("[ui] no role selected"); + return; + } + + try { + const snapshot = await invoke("demo_ws_manager_start_role", { + role: roleSelect.value, + }); + renderSnapshot(snapshot); + } catch (error) { + appendLogLine(`[ui] start role error: ${String(error)}`); + } +} + +async function stopRole(): Promise { + if (!roleSelect || roleSelect.value.trim().length === 0) { + appendLogLine("[ui] no role selected"); + return; + } + + try { + const snapshot = await invoke("demo_ws_manager_stop_role", { + role: roleSelect.value, + }); + renderSnapshot(snapshot); + } catch (error) { + appendLogLine(`[ui] stop role error: ${String(error)}`); + } +} + +document.addEventListener("DOMContentLoaded", async () => { + void takeoverConsole(); + debug("demo_ws_manager window loaded"); + + const sidebarToggle = document.querySelector('#sidebarToggle'); + if (sidebarToggle) { + // restaurer l’état depuis localStorage + if (localStorage.getItem('sidebar-toggle') === 'true') { + document.body.classList.add('sidenav-toggled'); + } + + sidebarToggle.addEventListener('click', (event) => { + event.preventDefault(); + document.body.classList.toggle('sidenav-toggled'); + localStorage.setItem('sidebar-toggle', document.body.classList.contains('sidenav-toggled') ? 'true' : 'false'); + }); + } + + const tooltipTriggerList = document.querySelectorAll('[data-bs-toggle="tooltip"]'); + Array.from(tooltipTriggerList).map(tooltipTriggerEl => new bootstrap.Tooltip(tooltipTriggerEl)); + const toastElList = document.querySelectorAll('.toast'); + Array.from(toastElList).map(toastEl => new bootstrap.Toast(toastEl)); + const popoverTriggerList = document.querySelectorAll('[data-bs-toggle="popover"]'); + Array.from(popoverTriggerList).map(popoverTriggerEl => new bootstrap.Popover(popoverTriggerEl)); + + const gobackto = location.pathname + location.search; + + document.querySelectorAll('a[data-setlang]').forEach((a) => { + const href = a.getAttribute("href"); + if (!href) return; // pas de href => on ignore + + const url = new URL(href, location.origin); + url.searchParams.set("gobackto", gobackto); + + // conserve une URL relative (path + query) + a.setAttribute("href", url.pathname + "?" + url.searchParams.toString()); + }); + + if (startAllButton) { + startAllButton.addEventListener("click", () => { + void startAll(); + }); + } + if (stopAllButton) { + stopAllButton.addEventListener("click", () => { + void stopAll(); + }); + } + if (refreshButton) { + refreshButton.addEventListener("click", () => { + void refreshSnapshot(); + }); + } + if (startRoleButton) { + startRoleButton.addEventListener("click", () => { + void startRole(); + }); + } + if (stopRoleButton) { + stopRoleButton.addEventListener("click", () => { + void stopRole(); + }); + } + if (clearLogButton && logTextarea) { + clearLogButton.addEventListener("click", () => { + logTextarea.value = ""; + }); + } + + await listen("kb-demo-ws-manager-log", (event) => { + appendLogLine(event.payload); + }); + + await listen("kb-demo-ws-manager-snapshot", (event) => { + renderSnapshot(event.payload); + }); + + await loadRoles(); + await refreshSnapshot(); +}); diff --git a/kb_app/frontend/ts/main.ts b/kb_app/frontend/ts/main.ts index 4b9605c..81112d9 100644 --- a/kb_app/frontend/ts/main.ts +++ b/kb_app/frontend/ts/main.ts @@ -23,6 +23,14 @@ async function openDemoHttpWindow(): Promise { console.error("open_demo_http_window failed:", error); } } + +async function openDemoWsManagerWindow(): Promise { + try { + await invoke("open_demo_ws_manager_window"); + } catch (error) { + console.error("open_demo_ws_manager_window failed:", error); + } +} document.addEventListener("DOMContentLoaded", async () => { void takeoverConsole(); const sidebarToggle = document.querySelector('#sidebarToggle'); @@ -64,6 +72,8 @@ document.addEventListener("DOMContentLoaded", async () => { const openDemoHttpButton = document.querySelector("#openDemoHttpButton"); const openDemoHttpButtonSecondary = document.querySelector("#openDemoHttpButtonSecondary"); + const openDemoWsManagerButton = document.querySelector("#openDemoWsManagerButton"); + const openDemoWsManagerButtonSecondary = document.querySelector("#openDemoWsManagerButtonSecondary"); if (openDemoWsButton) { openDemoWsButton.addEventListener("click", () => { @@ -88,6 +98,18 @@ document.addEventListener("DOMContentLoaded", async () => { void openDemoHttpWindow(); }); } + + if (openDemoWsManagerButton) { + openDemoWsManagerButton.addEventListener("click", () => { + void openDemoWsManagerWindow(); + }); + } + + if (openDemoWsManagerButtonSecondary) { + openDemoWsManagerButtonSecondary.addEventListener("click", () => { + void openDemoWsManagerWindow(); + }); + } debug("window loaded"); diff --git a/kb_app/gen/schemas/capabilities.json b/kb_app/gen/schemas/capabilities.json index 942cafe..99db2a1 100644 --- a/kb_app/gen/schemas/capabilities.json +++ b/kb_app/gen/schemas/capabilities.json @@ -1 +1 @@ -{"default":{"identifier":"default","description":"Capability for the main window","local":true,"windows":["main","splash","demo_ws","demo_http"],"permissions":["core:default","tracing:default"]}} \ No newline at end of file +{"default":{"identifier":"default","description":"Capability for the main window","local":true,"windows":["main","splash","demo_ws","demo_http","demo_ws_manager"],"permissions":["core:default","tracing:default"]}} \ No newline at end of file diff --git a/kb_app/package.json b/kb_app/package.json index e2b8c88..1c58b35 100644 --- a/kb_app/package.json +++ b/kb_app/package.json @@ -1,7 +1,7 @@ { "name": "kb-app", "private": true, - "version": "0.4.4", + "version": "0.6.6", "type": "module", "scripts": { "dev": "vite", diff --git a/kb_app/src/demo_ws_manager.rs b/kb_app/src/demo_ws_manager.rs new file mode 100644 index 0000000..974ee4a --- /dev/null +++ b/kb_app/src/demo_ws_manager.rs @@ -0,0 +1,565 @@ +// file: kb_app/src/demo_ws_manager.rs + +//! Demo WebSocket manager window commands and runtime state. +//! +//! This module provides a lightweight test bench for `kb_lib::WsManager`. + +use tauri::Emitter; +use tauri::Manager; + +/// Static endpoint summary enriched with current manager state. +#[derive(Clone, Debug, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct KbDemoWsManagerEndpointSummary { + name: std::string::String, + resolved_url: std::string::String, + provider: std::string::String, + roles: std::vec::Vec, + connection_state: std::string::String, + active_subscription_count: usize, +} + +/// Global demo manager snapshot payload. +#[derive(Clone, Debug, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct KbDemoWsManagerSnapshotPayload { + endpoint_count: usize, + started_count: usize, + endpoints: std::vec::Vec, +} + +/// Runtime state for the demo WebSocket manager window. +#[derive(Debug)] +pub(crate) struct KbDemoWsManagerRuntimeState { + relay_task: std::option::Option>, +} + +impl KbDemoWsManagerRuntimeState { + /// Creates a new empty runtime state. + pub(crate) fn new() -> Self { + Self { relay_task: None } + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct DemoWsManagerActionResult { + action: std::string::String, + target: std::string::String, + matched_count: usize, + changed_count: usize, + unchanged_count: usize, +} + +/// Shows and focuses the preconfigured `demo_ws_manager` window. +#[tauri::command] +pub(crate) async fn open_demo_ws_manager_window( + app_handle: tauri::AppHandle, + state: tauri::State<'_, crate::KbAppState>, +) -> Result<(), std::string::String> { + kb_ensure_demo_ws_manager_relay(&app_handle, &state).await; + let existing_window_option = app_handle.get_webview_window("demo_ws_manager"); + let demo_window = match existing_window_option { + Some(demo_window) => demo_window, + None => { + let builder = tauri::WebviewWindowBuilder::new( + &app_handle, + "demo_ws_manager", + tauri::WebviewUrl::App("demo_ws_manager.html".into()), + ) + .title("Demo Ws Manager") + .inner_size(1280.0, 800.0) + .min_inner_size(900.0, 620.0) + .center() + .visible(true) + .transparent(false) + .decorations(true); + let build_result = builder.build(); + match build_result { + Ok(window) => window, + Err(error) => { + return Err(format!("cannot create demo_ws_manager window: {error:?}")); + } + } + } + }; + let show_result = demo_window.show(); + if let Err(error) = show_result { + return Err(format!("cannot show demo_ws_manager window: {error:?}")); + } + let focus_result = demo_window.set_focus(); + if let Err(error) = focus_result { + return Err(format!("cannot focus demo_ws_manager window: {error:?}")); + } + kb_emit_demo_ws_manager_log(&app_handle, "[ui] demo_ws_manager window loaded"); + kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await; + Ok(()) +} + +/// Returns the current manager snapshot. +#[tauri::command] +pub(crate) async fn demo_ws_manager_get_snapshot( + state: tauri::State<'_, crate::KbAppState>, +) -> Result { + kb_build_demo_ws_manager_snapshot(&state).await +} + +/// Returns the distinct configured roles for enabled websocket endpoints. +#[tauri::command] +pub(crate) async fn demo_ws_manager_list_roles( + state: tauri::State<'_, crate::KbAppState>, +) -> Result, std::string::String> { + let mut roles = std::collections::BTreeSet::new(); + for endpoint in &state.config.solana.ws_endpoints { + if !endpoint.enabled { + continue; + } + for role in &endpoint.roles { + roles.insert(role.clone()); + } + } + Ok(roles.into_iter().collect()) +} + +/// Starts all managed websocket endpoints. +#[tauri::command] +pub(crate) async fn demo_ws_manager_start_all( + app_handle: tauri::AppHandle, + state: tauri::State<'_, crate::KbAppState>, +) -> Result { + kb_ensure_demo_ws_manager_relay(&app_handle, &state).await; + let matched_count = state.ws_manager.endpoint_names().await.len(); + let start_result = state.ws_manager.start_all().await; + let changed_count = match start_result { + Ok(changed_count) => changed_count, + Err(error) => return Err(error.to_string()), + }; + let action_result = kb_build_action_result("start", "all", matched_count, changed_count); + kb_emit_demo_ws_manager_log( + &app_handle, + kb_format_action_result_for_log(&action_result).as_str(), + ); + kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await; + kb_build_demo_ws_manager_snapshot(&state).await +} + +/// Stops all managed websocket endpoints. +#[tauri::command] +pub(crate) async fn demo_ws_manager_stop_all( + app_handle: tauri::AppHandle, + state: tauri::State<'_, crate::KbAppState>, +) -> Result { + let matched_count = state.ws_manager.endpoint_names().await.len(); + let stop_result = state.ws_manager.stop_all().await; + let changed_count = match stop_result { + Ok(changed_count) => changed_count, + Err(error) => return Err(error.to_string()), + }; + let action_result = kb_build_action_result("stop", "all", matched_count, changed_count); + kb_emit_demo_ws_manager_log( + &app_handle, + kb_format_action_result_for_log(&action_result).as_str(), + ); + kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await; + kb_build_demo_ws_manager_snapshot(&state).await +} + +/// Starts all managed websocket endpoints having the selected role. +#[tauri::command] +pub(crate) async fn demo_ws_manager_start_role( + app_handle: tauri::AppHandle, + state: tauri::State<'_, crate::KbAppState>, + role: std::string::String, +) -> Result { + kb_ensure_demo_ws_manager_relay(&app_handle, &state).await; + let matched_count = state + .ws_manager + .endpoint_names_for_role(role.as_str()) + .await + .len(); + let start_result = state.ws_manager.start_role(role.as_str()).await; + let changed_count = match start_result { + Ok(changed_count) => changed_count, + Err(error) => return Err(error.to_string()), + }; + let action_result = + kb_build_action_result("start", role.as_str(), matched_count, changed_count); + kb_emit_demo_ws_manager_log( + &app_handle, + kb_format_action_result_for_log(&action_result).as_str(), + ); + kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await; + kb_build_demo_ws_manager_snapshot(&state).await +} + +/// Stops all managed websocket endpoints having the selected role. +#[tauri::command] +pub(crate) async fn demo_ws_manager_stop_role( + app_handle: tauri::AppHandle, + state: tauri::State<'_, crate::KbAppState>, + role: std::string::String, +) -> Result { + let matched_count = state + .ws_manager + .endpoint_names_for_role(role.as_str()) + .await + .len(); + let stop_result = state.ws_manager.stop_role(role.as_str()).await; + let changed_count = match stop_result { + Ok(changed_count) => changed_count, + Err(error) => return Err(error.to_string()), + }; + let action_result = kb_build_action_result("stop", role.as_str(), matched_count, changed_count); + kb_emit_demo_ws_manager_log( + &app_handle, + kb_format_action_result_for_log(&action_result).as_str(), + ); + kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await; + kb_build_demo_ws_manager_snapshot(&state).await +} + +async fn kb_build_demo_ws_manager_snapshot( + state: &tauri::State<'_, crate::KbAppState>, +) -> Result { + let snapshot_result = state.ws_manager.snapshot().await; + let snapshot = match snapshot_result { + Ok(snapshot) => snapshot, + Err(error) => return Err(error.to_string()), + }; + let mut endpoints = std::vec::Vec::new(); + for managed_endpoint in snapshot.endpoints { + let config_endpoint_option = state + .config + .find_ws_endpoint(&managed_endpoint.endpoint_name); + let config_endpoint = match config_endpoint_option { + Some(config_endpoint) => config_endpoint, + None => { + return Err(format!( + "managed websocket endpoint '{}' is missing from config", + managed_endpoint.endpoint_name + )); + } + }; + endpoints.push(KbDemoWsManagerEndpointSummary { + name: managed_endpoint.endpoint_name, + resolved_url: managed_endpoint.resolved_url, + provider: managed_endpoint.provider, + roles: config_endpoint.roles.clone(), + connection_state: kb_connection_state_to_string(managed_endpoint.state), + active_subscription_count: managed_endpoint.active_subscription_count, + }); + } + Ok(KbDemoWsManagerSnapshotPayload { + endpoint_count: snapshot.endpoint_count, + started_count: snapshot.started_count, + endpoints, + }) +} + +async fn kb_emit_demo_ws_manager_snapshot( + app_handle: &tauri::AppHandle, + state: &tauri::State<'_, crate::KbAppState>, +) { + let snapshot_result = kb_build_demo_ws_manager_snapshot(state).await; + let snapshot = match snapshot_result { + Ok(snapshot) => snapshot, + Err(error) => { + kb_emit_demo_ws_manager_log(app_handle, &format!("[ui] snapshot error: {error}")); + return; + } + }; + let emit_result = app_handle.emit("kb-demo-ws-manager-snapshot", snapshot); + if let Err(error) = emit_result { + tracing::error!("error emitting demo_ws_manager snapshot event: {error:?}"); + } +} + +async fn kb_ensure_demo_ws_manager_relay( + app_handle: &tauri::AppHandle, + state: &tauri::State<'_, crate::KbAppState>, +) { + let mut runtime_guard = state.demo_ws_manager_runtime.lock().await; + if runtime_guard.relay_task.is_some() { + return; + } + let mut receiver = state.ws_manager.subscribe_events(); + let relay_app_handle = app_handle.clone(); + let relay_state = state.demo_ws_manager_runtime.clone(); + let relay_task = tauri::async_runtime::spawn(async move { + loop { + let recv_result = receiver.recv().await; + match recv_result { + Ok(event) => { + let line = kb_format_ws_event(&event); + kb_emit_demo_ws_manager_log(&relay_app_handle, line.as_str()); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { + kb_emit_demo_ws_manager_log( + &relay_app_handle, + &format!( + "[manager] event receiver lagged and skipped {} message(s)", + skipped + ), + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + break; + } + } + } + let mut runtime_guard = relay_state.lock().await; + runtime_guard.relay_task = None; + }); + + runtime_guard.relay_task = Some(relay_task); +} + +fn kb_emit_demo_ws_manager_log(app_handle: &tauri::AppHandle, message: &str) { + let emit_result = app_handle.emit("kb-demo-ws-manager-log", message.to_string()); + if let Err(error) = emit_result { + tracing::error!("error emitting demo_ws_manager log event: {error:?}"); + } +} + +fn kb_connection_state_to_string(state: kb_lib::KbConnectionState) -> std::string::String { + match state { + kb_lib::KbConnectionState::Disconnected => "Disconnected".to_string(), + kb_lib::KbConnectionState::Connecting => "Connecting".to_string(), + kb_lib::KbConnectionState::Connected => "Connected".to_string(), + kb_lib::KbConnectionState::Disconnecting => "Disconnecting".to_string(), + } +} + +fn kb_format_ws_event(event: &kb_lib::WsEvent) -> std::string::String { + match event { + kb_lib::WsEvent::Connected { + endpoint_name, + endpoint_url, + } => { + format!("[ws:{endpoint_name}] connected to {endpoint_url}") + } + kb_lib::WsEvent::TextMessage { + endpoint_name, + text, + } => { + format!("[ws:{endpoint_name}] text: {text}") + } + kb_lib::WsEvent::JsonRpcMessage { + endpoint_name, + message, + } => match message { + kb_lib::KbJsonRpcWsIncomingMessage::SuccessResponse(response) => { + format!( + "[ws:{endpoint_name}] json-rpc success id={} result={}", + response.id, response.result + ) + } + kb_lib::KbJsonRpcWsIncomingMessage::ErrorResponse(response) => { + format!( + "[ws:{endpoint_name}] json-rpc error id={} code={} message={}", + response.id, response.error.code, response.error.message + ) + } + kb_lib::KbJsonRpcWsIncomingMessage::Notification(notification) => { + format!( + "[ws:{endpoint_name}] json-rpc notification method={} subscription={} result={}", + notification.method, + notification.params.subscription, + notification.params.result + ) + } + }, + kb_lib::WsEvent::JsonRpcParseError { + endpoint_name, + text, + error, + } => { + format!( + "[ws:{endpoint_name}] json-rpc parse error: {} | raw={}", + error, text + ) + } + kb_lib::WsEvent::SubscriptionRegistered { + endpoint_name, + subscription, + } => { + format!( + "[ws:{endpoint_name}] subscription registered subscribe_method={} unsubscribe_method={} notification_method={} request_id={} subscription_id={}", + subscription.subscribe_method, + subscription.unsubscribe_method, + subscription.notification_method, + subscription.request_id, + subscription.subscription_id + ) + } + kb_lib::WsEvent::SubscriptionNotification { + endpoint_name, + subscription, + notification, + method_matches_registry, + } => { + format!( + "[ws:{endpoint_name}] tracked notification subscription_id={} method={} expected={} matches={} result={}", + subscription.subscription_id, + notification.method, + subscription.notification_method, + method_matches_registry, + notification.params.result + ) + } + kb_lib::WsEvent::JsonRpcNotificationWithoutSubscription { + endpoint_name, + notification, + } => { + format!( + "[ws:{endpoint_name}] untracked notification method={} subscription={} result={}", + notification.method, notification.params.subscription, notification.params.result + ) + } + kb_lib::WsEvent::SubscriptionUnregistered { + endpoint_name, + subscription_id, + unsubscribe_method, + was_active, + } => { + format!( + "[ws:{endpoint_name}] subscription unregistered subscription_id={} unsubscribe_method={} was_active={}", + subscription_id, unsubscribe_method, was_active + ) + } + kb_lib::WsEvent::BinaryMessage { + endpoint_name, + data, + } => { + format!("[{endpoint_name}] binary ({} bytes)", data.len()) + } + kb_lib::WsEvent::Ping { + endpoint_name, + data, + } => { + format!("[{endpoint_name}] ping ({} bytes)", data.len()) + } + kb_lib::WsEvent::Pong { + endpoint_name, + data, + } => { + format!("[{endpoint_name}] pong ({} bytes)", data.len()) + } + kb_lib::WsEvent::CloseReceived { + endpoint_name, + code, + reason, + } => { + format!( + "[ws:{endpoint_name}] close received code={:?} reason={:?}", + code, reason + ) + } + kb_lib::WsEvent::Disconnected { endpoint_name } => { + format!("[ws:{endpoint_name}] disconnected") + } + kb_lib::WsEvent::Error { + endpoint_name, + error, + } => { + format!("[ws:{endpoint_name}] error: {error}") + } + } +} + +fn kb_build_action_result( + action: &str, + target: &str, + matched_count: usize, + changed_count: usize, +) -> DemoWsManagerActionResult { + let unchanged_count = matched_count.saturating_sub(changed_count); + DemoWsManagerActionResult { + action: action.to_string(), + target: target.to_string(), + matched_count, + changed_count, + unchanged_count, + } +} + +fn kb_action_past_tense(action: &str) -> &'static str { + match action { + "start" => "started", + "stop" => "stopped", + _ => "processed", + } +} + +fn kb_format_action_result_for_log(result: &DemoWsManagerActionResult) -> std::string::String { + let is_all = result.target == "all"; + let past = kb_action_past_tense(result.action.as_str()); + if result.matched_count == 0 { + if is_all { + return "[ui] no managed websocket endpoint is configured".to_string(); + } + return format!( + "[ui] no managed websocket endpoint matches role '{}'", + result.target + ); + } + if result.changed_count == 0 { + if is_all { + return format!( + "[ui] all managed websocket endpoints were already {}", + if result.action == "start" { + "started" + } else { + "stopped" + } + ); + } + return format!( + "[ui] role '{}' was already {} on {} endpoint(s)", + result.target, + if result.action == "start" { + "started" + } else { + "stopped" + }, + result.unchanged_count + ); + } + if result.unchanged_count == 0 { + if is_all { + return format!( + "[ui] {}ed {} managed websocket endpoint(s)", + past, result.changed_count + ); + } + return format!( + "[ui] {}ed role '{}' on {} endpoint(s)", + past, result.target, result.changed_count + ); + } + if is_all { + return format!( + "[ui] {}ed {} managed websocket endpoint(s); {} already {}", + past, + result.changed_count, + result.unchanged_count, + if result.action == "start" { + "started" + } else { + "stopped" + } + ); + } + format!( + "[ui] {}ed role '{}' on {} endpoint(s); {} already {}", + result.action, + result.target, + result.changed_count, + result.unchanged_count, + if result.action == "start" { + "started" + } else { + "stopped" + } + ) +} diff --git a/kb_app/src/lib.rs b/kb_app/src/lib.rs index 91aa960..c37969a 100644 --- a/kb_app/src/lib.rs +++ b/kb_app/src/lib.rs @@ -11,6 +11,7 @@ mod demo_http; mod demo_ws; +mod demo_ws_manager; mod splash; pub use crate::splash::SplashOrder; @@ -37,6 +38,8 @@ struct KbAppState { config: kb_lib::KbConfig, ws_runtime: tokio::sync::Mutex, demo_ws_runtime: std::sync::Arc>, + demo_ws_manager_runtime: std::sync::Arc>, + ws_manager: std::sync::Arc, http_pool: kb_lib::HttpEndpointPool, } @@ -82,12 +85,24 @@ pub fn run() { panic!("cannot create http endpoint pool: {}", error); } }; + let ws_manager_result = kb_lib::WsManager::from_config(&config); + let ws_manager = match ws_manager_result { + Ok(ws_manager) => ws_manager, + Err(error) => { + tracing::error!("cannot create websocket manager: {}", error); + panic!("cannot create websocket manager: {}", error); + } + }; let app_state = KbAppState { config: config.clone(), ws_runtime: tokio::sync::Mutex::new(KbWsRuntimeState::new()), demo_ws_runtime: std::sync::Arc::new(tokio::sync::Mutex::new( crate::demo_ws::KbDemoWsRuntimeState::new(), )), + demo_ws_manager_runtime: std::sync::Arc::new(tokio::sync::Mutex::new( + crate::demo_ws_manager::KbDemoWsManagerRuntimeState::new(), + )), + ws_manager: std::sync::Arc::new(ws_manager), http_pool, }; let tracing_builder = tauri_plugin_tracing::Builder::new(); @@ -106,6 +121,13 @@ pub fn run() { crate::demo_http::open_demo_http_window, crate::demo_http::demo_http_list_pool_clients, crate::demo_http::demo_http_execute_request, + crate::demo_ws_manager::open_demo_ws_manager_window, + crate::demo_ws_manager::demo_ws_manager_get_snapshot, + crate::demo_ws_manager::demo_ws_manager_list_roles, + crate::demo_ws_manager::demo_ws_manager_start_all, + crate::demo_ws_manager::demo_ws_manager_stop_all, + crate::demo_ws_manager::demo_ws_manager_start_role, + crate::demo_ws_manager::demo_ws_manager_stop_role, ]); tauri_builder = tauri_builder.plugin(tracing_builder.build::()); tauri_builder = tauri_builder.setup(|app| { diff --git a/kb_app/tauri.conf.json b/kb_app/tauri.conf.json index 203f714..833d2de 100644 --- a/kb_app/tauri.conf.json +++ b/kb_app/tauri.conf.json @@ -1,7 +1,7 @@ { "$schema": "https://schema.tauri.app/config/2", "productName": "kb-bapp", - "version": "0.4.4", + "version": "0.6.6", "identifier": "com.sasedev.kb-app", "build": { "beforeDevCommand": "npm run dev", @@ -64,6 +64,20 @@ "create": false, "transparent": false, "decorations": true + }, + { + "label": "demo_ws_manager", + "url": "demo_ws_manager.html", + "title": "Demo Ws Manager", + "width": 1280, + "height": 800, + "minWidth": 900, + "minHeight": 620, + "center": true, + "visible": false, + "create": false, + "transparent": false, + "decorations": true } ], "security": { diff --git a/kb_lib/src/ws_manager.rs b/kb_lib/src/ws_manager.rs index 74db560..a976a21 100644 --- a/kb_lib/src/ws_manager.rs +++ b/kb_lib/src/ws_manager.rs @@ -139,11 +139,14 @@ impl WsManager { let endpoint_names = self.endpoint_names_for_role(role).await; let mut started_count = 0_usize; for endpoint_name in endpoint_names { - let start_result = self.start_endpoint(endpoint_name.as_str()).await; - if let Err(error) = start_result { - return Err(error); + let start_result = self.start_endpoint_inner(endpoint_name.as_str()).await; + let started = match start_result { + Ok(started) => started, + Err(error) => return Err(error), + }; + if started { + started_count += 1; } - started_count += 1; } Ok(started_count) } @@ -153,11 +156,14 @@ impl WsManager { let endpoint_names = self.endpoint_names_for_role(role).await; let mut stopped_count = 0_usize; for endpoint_name in endpoint_names { - let stop_result = self.stop_endpoint(endpoint_name.as_str()).await; - if let Err(error) = stop_result { - return Err(error); + let stop_result = self.stop_endpoint_inner(endpoint_name.as_str()).await; + let stopped = match stop_result { + Ok(stopped) => stopped, + Err(error) => return Err(error), + }; + if stopped { + stopped_count += 1; } - stopped_count += 1; } Ok(stopped_count) } @@ -172,8 +178,7 @@ impl WsManager { } } - /// Starts one managed endpoint. - pub async fn start_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> { + async fn start_endpoint_inner(&self, endpoint_name: &str) -> Result { let client_option = self.client(endpoint_name).await; let client = match client_option { Some(client) => client, @@ -184,6 +189,12 @@ impl WsManager { ))); } }; + let state = client.connection_state().await; + if state == crate::KbConnectionState::Connected + || state == crate::KbConnectionState::Connecting + { + return Ok(false); + } let sender_option = { let sender_guard = self.detection_relay_sender.lock().await; sender_guard.clone() @@ -195,11 +206,19 @@ impl WsManager { if let Err(error) = connect_result { return Err(error); } - Ok(()) + Ok(true) } - /// Stops one managed endpoint. - pub async fn stop_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> { + /// Starts one managed endpoint. + pub async fn start_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> { + let start_result = self.start_endpoint_inner(endpoint_name).await; + match start_result { + Ok(_) => Ok(()), + Err(error) => Err(error), + } + } + + async fn stop_endpoint_inner(&self, endpoint_name: &str) -> Result { let client_option = self.client(endpoint_name).await; let client = match client_option { Some(client) => client, @@ -210,12 +229,27 @@ impl WsManager { ))); } }; + let state = client.connection_state().await; + if state == crate::KbConnectionState::Disconnected + || state == crate::KbConnectionState::Disconnecting + { + return Ok(false); + } client.clear_detection_notification_forwarder().await; let disconnect_result = client.disconnect().await; if let Err(error) = disconnect_result { return Err(error); } - Ok(()) + Ok(true) + } + + /// Stops one managed endpoint. + pub async fn stop_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> { + let stop_result = self.stop_endpoint_inner(endpoint_name).await; + match stop_result { + Ok(_) => Ok(()), + Err(error) => Err(error), + } } /// Starts all managed endpoints. @@ -223,11 +257,14 @@ impl WsManager { let endpoint_names = self.endpoint_names().await; let mut started_count = 0_usize; for endpoint_name in endpoint_names { - let start_result = self.start_endpoint(endpoint_name.as_str()).await; - if let Err(error) = start_result { - return Err(error); + let start_result = self.start_endpoint_inner(endpoint_name.as_str()).await; + let started = match start_result { + Ok(started) => started, + Err(error) => return Err(error), + }; + if started { + started_count += 1; } - started_count += 1; } Ok(started_count) } @@ -237,11 +274,14 @@ impl WsManager { let endpoint_names = self.endpoint_names().await; let mut stopped_count = 0_usize; for endpoint_name in endpoint_names { - let stop_result = self.stop_endpoint(endpoint_name.as_str()).await; - if let Err(error) = stop_result { - return Err(error); + let stop_result = self.stop_endpoint_inner(endpoint_name.as_str()).await; + let stopped = match stop_result { + Ok(stopped) => stopped, + Err(error) => return Err(error), + }; + if stopped { + stopped_count += 1; } - stopped_count += 1; } Ok(stopped_count) }