From b034fdf1c4ded57c23250a3535223c60204bbc00 Mon Sep 17 00:00:00 2001
From: SinuS Von SifriduS
Date: Sat, 25 Apr 2026 18:10:40 +0200
Subject: [PATCH] 0.6.6
---
CHANGELOG.md | 1 +
Cargo.toml | 2 +-
ROADMAP.md | 56 ++-
kb_app/capabilities/default.json | 3 +-
kb_app/frontend/demo_ws_manager.html | 109 +++++
kb_app/frontend/main.html | 10 +
kb_app/frontend/ts/demo_http.ts | 4 +-
kb_app/frontend/ts/demo_ws.ts | 5 +-
kb_app/frontend/ts/demo_ws_manager.ts | 237 +++++++++++
kb_app/frontend/ts/main.ts | 22 +
kb_app/gen/schemas/capabilities.json | 2 +-
kb_app/package.json | 2 +-
kb_app/src/demo_ws_manager.rs | 565 ++++++++++++++++++++++++++
kb_app/src/lib.rs | 22 +
kb_app/tauri.conf.json | 16 +-
kb_lib/src/ws_manager.rs | 84 +++-
16 files changed, 1088 insertions(+), 52 deletions(-)
create mode 100644 kb_app/frontend/demo_ws_manager.html
create mode 100644 kb_app/frontend/ts/demo_ws_manager.ts
create mode 100644 kb_app/src/demo_ws_manager.rs
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8161508..3bc3288 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -29,3 +29,4 @@
0.6.3 - Enrichissement des notifications WebSocket utiles : extraction améliorée de pubkey, signature, owner, parsed account type et slot pour account/logs/signature notifications
0.6.4 - Premières règles de détection technique pour candidats pools/listings depuis programNotification en s’appuyant sur les DEX connus en base
0.6.5 - Ajout de ws_manager.rs pour l’orchestration multi-clients WebSocket, le bus d’événements unifié et le branchement centralisé du relais de détection
+0.6.6 - Ajout de la fenêtre Demo Ws Manager dans kb_app pour piloter plusieurs WsClient, visualiser le snapshot consolidé, tester le démarrage/arrêt par rôle et valider le flux unifié de WsEvent
diff --git a/Cargo.toml b/Cargo.toml
index 5d20d1e..f88990e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,7 +8,7 @@ members = [
]
[workspace.package]
-version = "0.6.5"
+version = "0.6.6"
edition = "2024"
license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot"
diff --git a/ROADMAP.md b/ROADMAP.md
index 2d0a8b1..0d379e3 100644
--- a/ROADMAP.md
+++ b/ROADMAP.md
@@ -436,7 +436,18 @@ Réalisé :
- branchement optionnel du relais de détection WS sur tous les clients orchestrés,
- préparation des futures politiques de répartition, supervision et reconnexion.
-### 6.031. Version `0.7.0` — Résolution transactionnelle orientée DEX
+### 6.031. Version `0.6.6` — Démo légère `WsManager` dans `kb_app`
+Réalisé :
+
+- ajout d’une fenêtre `Demo Ws Manager` dans `kb_app`,
+- ouverture depuis la fenêtre principale,
+- affichage du snapshot consolidé du `WsManager`,
+- pilotage des endpoints WS gérés via `start/stop all` et `start/stop role`,
+- visualisation du flux unifié de `WsEvent`,
+- validation UI du branchement centralisé du relais de détection,
+- amélioration des messages de log UI pour les actions idempotentes déjà démarrées ou déjà arrêtées.
+
+### 6.032. Version `0.7.0` — Résolution transactionnelle orientée DEX
Objectif : relier la détection temps réel aux transactions Solana complètes.
À faire :
@@ -446,7 +457,7 @@ Objectif : relier la détection temps réel aux transactions Solana complètes.
- utiliser le pool HTTP existant pour enrichir les signaux détectés côté WS,
- éviter qu’une notification intéressante reste au niveau d’un simple signal technique sans résolution métier.
-### 6.032. Version `0.7.1` — Modèle transactionnel Solana enrichi
+### 6.033. Version `0.7.1` — Modèle transactionnel Solana enrichi
Objectif : préparer un modèle interne plus riche, inspiré d’une vision `slot -> signature -> instruction`.
À faire :
@@ -456,7 +467,7 @@ Objectif : préparer un modèle interne plus riche, inspiré d’une vision `slo
- conserver la possibilité de relier plus tard un pool, un token ou un wallet à une signature fondatrice,
- préparer l’historique transactionnel nécessaire aux futurs décodeurs DEX.
-### 6.033. Version `0.7.2` — Décodeurs DEX spécifiques par programme et version
+### 6.034. Version `0.7.2` — Décodeurs DEX spécifiques par programme et version
Objectif : remplacer les heuristiques ponctuelles par de vrais décodeurs Rust dédiés.
À faire :
@@ -466,7 +477,7 @@ Objectif : remplacer les heuristiques ponctuelles par de vrais décodeurs Rust d
- encapsuler les index de comptes et les motifs de logs propres à chaque protocole,
- prévoir des décodeurs séparés au minimum pour Raydium, Pump.fun / PumpSwap, Meteora, puis les autres cibles.
-### 6.034. Version `0.7.3` — Détection des nouveaux pools et paires via logs + transaction
+### 6.035. Version `0.7.3` — Détection des nouveaux pools et paires via logs + transaction
Objectif : détecter rapidement les nouvelles paires/pools à partir des flux RPC et des transactions enrichies.
À faire :
@@ -476,7 +487,7 @@ Objectif : détecter rapidement les nouvelles paires/pools à partir des flux RP
- extraire token A, token B, LP mint, vaults et comptes utiles quand cela est possible,
- alimenter `kb_pools`, `kb_pairs`, `kb_pool_tokens` et `kb_pool_listings` avec des données plus fiables que la seule détection de comptes.
-### 6.035. Version `0.7.4` — Modèle métier DEX enrichi
+### 6.036. Version `0.7.4` — Modèle métier DEX enrichi
Objectif : faire converger la détection technique et le modèle métier vers une vision proche de l’activité réelle du marché.
À faire :
@@ -486,7 +497,7 @@ Objectif : faire converger la détection technique et le modèle métier vers un
- préparer une vision cohérente `token <-> pool <-> pair <-> protocole`,
- distinguer les objets de référence des événements d’activité.
-### 6.036. Version `0.7.5` — Wallets, holdings et participants observés
+### 6.037. Version `0.7.5` — Wallets, holdings et participants observés
Objectif : préparer le suivi des acteurs on-chain autour des pools et tokens détectés.
À faire :
@@ -496,7 +507,7 @@ Objectif : préparer le suivi des acteurs on-chain autour des pools et tokens d
- préparer l’identification des créateurs, mint authorities, wallets d’activité et contreparties,
- éviter de limiter l’analyse future au seul niveau token/pool sans vision des participants.
-### 6.037. Version `0.7.6` — Séries de prix, volumes et agrégats DEX
+### 6.038. Version `0.7.6` — Séries de prix, volumes et agrégats DEX
Objectif : préparer la couche analytique fine à partir des événements métier normalisés.
À faire :
@@ -506,7 +517,7 @@ Objectif : préparer la couche analytique fine à partir des événements métie
- permettre plus tard le calcul d’OHLCV, volume, nombre de trades et liquidité par fenêtre,
- préparer le terrain pour la couche analytique `0.8.x`.
-### 6.038. Version `0.7.x` — DEX connectors v1
+### 6.039. Version `0.7.x` — DEX connectors v1
Objectif : structurer les connecteurs DEX autour d’un pipeline complet de résolution, décodage et normalisation métier.
Cibles initiales possibles :
@@ -526,7 +537,7 @@ Résultat attendu :
- création d’objets métier riches pour tokens, pools, paires, wallets, holdings et séries de prix,
- remplacement progressif des scripts heuristiques externes par des composants Rust intégrés.
-### 6.039. Version `0.8.x` — Analyse et filtrage
+### 6.040. Version `0.8.x` — Analyse et filtrage
Objectif : transformer les événements bruts en signaux exploitables.
À faire :
@@ -537,7 +548,7 @@ Objectif : transformer les événements bruts en signaux exploitables.
- statistiques de comportement,
- premiers patterns.
-### 6.040. Version `1.x.y` — Wallets et swap préparatoire
+### 6.041. Version `1.x.y` — Wallets et swap préparatoire
Objectif : préparer la couche d’action.
À faire :
@@ -548,7 +559,7 @@ Objectif : préparer la couche d’action.
- préparation d’ordres et de swaps,
- simulation et garde-fous.
-### 6.041. Version `2.x.y` — Trading semi-automatisé
+### 6.042. Version `2.x.y` — Trading semi-automatisé
Objectif : brancher l’analyse à l’action tout en gardant des garde-fous explicites.
À faire :
@@ -559,7 +570,7 @@ Objectif : brancher l’analyse à l’action tout en gardant des garde-fous exp
- confirmations explicites ou semi-automatiques,
- journaux d’exécution.
-### 6.042. Version `3.x.y` — Yellowstone gRPC
+### 6.043. Version `3.x.y` — Yellowstone gRPC
Objectif : ajouter le connecteur gRPC dédié.
À faire :
@@ -580,10 +591,12 @@ Modules cibles à court terme :
- `constants.rs`
- `types.rs`
- `ws_client.rs`
+- `ws_manager.rs`
- `http_client.rs`
-- `rpc_json.rs`
-- `rpc_ws.rs`
-- `rpc_ws_solana.rs`
+- `http_pool.rs`
+- `json_rpc_ws.rs`
+- `solana_pubsub_ws.rs`
+- `detect.rs`
### 7.2. `kb_app`
Responsabilités cibles :
@@ -644,9 +657,10 @@ Le projet doit maintenir au minimum :
## 12. Priorité immédiate
La priorité immédiate est désormais la suivante :
-1. démarrer la version `0.6.3` avec l’enrichissement des notifications WS utiles,
-2. améliorer l’extraction des métadonnées utiles depuis `accountNotification`, `logsNotification` et `signatureNotification`,
-3. produire des observations on-chain plus précises et homogènes,
-4. préparer ensuite la version `0.6.4` pour les premières règles de détection technique,
-5. conserver le découplage entre transport, détection et stockage,
-6. planifier enfin `0.6.5` pour l’orchestration multi-clients WebSocket via `ws_pool.rs` ou `ws_manager.rs`.
+1. démarrer la version `0.7.0` avec la résolution transactionnelle orientée DEX,
+2. introduire une file de signatures ou de résolutions alimentée par les flux WS utiles,
+3. corréler `logsNotification`, `programNotification` et `signatureNotification` avec des appels `getTransaction`,
+4. utiliser le pool HTTP existant pour enrichir les signaux détectés côté WS,
+5. préparer ensuite la version `0.7.1` pour le modèle transactionnel Solana enrichi,
+6. conserver le découplage entre transport, résolution transactionnelle, détection métier et stockage.
+
diff --git a/kb_app/capabilities/default.json b/kb_app/capabilities/default.json
index 8f272a1..2d822f0 100644
--- a/kb_app/capabilities/default.json
+++ b/kb_app/capabilities/default.json
@@ -6,7 +6,8 @@
"main",
"splash",
"demo_ws",
- "demo_http"
+ "demo_http",
+ "demo_ws_manager"
],
"permissions": [
"core:default",
diff --git a/kb_app/frontend/demo_ws_manager.html b/kb_app/frontend/demo_ws_manager.html
new file mode 100644
index 0000000..9f0af67
--- /dev/null
+++ b/kb_app/frontend/demo_ws_manager.html
@@ -0,0 +1,109 @@
+
+
+
+
+
+
+
+ Khadhroony-BoBoBot — Demo Ws Manager
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/kb_app/frontend/main.html b/kb_app/frontend/main.html
index f5993a1..48cffa0 100644
--- a/kb_app/frontend/main.html
+++ b/kb_app/frontend/main.html
@@ -25,6 +25,9 @@
Demo Http
+
+ Demo Ws Manager
+
@@ -54,6 +57,10 @@
Les tests PRC Http manuels sont disponibles dans la fenêtre dédiée
Demo Http .
+
+ La démonstration légère de pilotage multi-clients est disponible dans la fenêtre
+ Demo Ws Manager .
+
@@ -62,6 +69,9 @@
Ouvrir Demo Http
+
+ Ouvrir Demo Ws Manager
+
diff --git a/kb_app/frontend/ts/demo_http.ts b/kb_app/frontend/ts/demo_http.ts
index 03ad29b..0b20105 100644
--- a/kb_app/frontend/ts/demo_http.ts
+++ b/kb_app/frontend/ts/demo_http.ts
@@ -268,6 +268,8 @@ async function refreshPoolSnapshot(
document.addEventListener("DOMContentLoaded", async () => {
void takeoverConsole();
+ debug("demo_http window loaded");
+
const sidebarToggle = document.querySelector('#sidebarToggle');
if (sidebarToggle) {
// restaurer l’état depuis localStorage
@@ -432,6 +434,4 @@ document.addEventListener("DOMContentLoaded", async () => {
appendLogLine(logTextarea, "[ui] demo_http window loaded");
await refreshPoolSnapshot(poolTableBody, logTextarea, true);
-
- debug("demo_http window loaded");
});
\ No newline at end of file
diff --git a/kb_app/frontend/ts/demo_ws.ts b/kb_app/frontend/ts/demo_ws.ts
index 7124e53..459a799 100644
--- a/kb_app/frontend/ts/demo_ws.ts
+++ b/kb_app/frontend/ts/demo_ws.ts
@@ -240,6 +240,9 @@ function applyStatusToUi(
document.addEventListener("DOMContentLoaded", async () => {
void takeoverConsole();
+
+ debug("demo_ws window loaded");
+
const sidebarToggle = document.querySelector('#sidebarToggle');
if (sidebarToggle) {
// restaurer l’état depuis localStorage
@@ -514,6 +517,4 @@ document.addEventListener("DOMContentLoaded", async () => {
unlistenStatusEvent();
}
});
-
- debug("demo_ws window loaded");
});
\ No newline at end of file
diff --git a/kb_app/frontend/ts/demo_ws_manager.ts b/kb_app/frontend/ts/demo_ws_manager.ts
new file mode 100644
index 0000000..4c03587
--- /dev/null
+++ b/kb_app/frontend/ts/demo_ws_manager.ts
@@ -0,0 +1,237 @@
+// file: kb_app/frontend/ts/demo_ws_manager.ts
+
+import * as bootstrap from "bootstrap";
+import "simplebar";
+import ResizeObserver from "resize-observer-polyfill";
+import { invoke } from "@tauri-apps/api/core";
+import { listen } from "@tauri-apps/api/event";
+import { debug, takeoverConsole } from "@fltsci/tauri-plugin-tracing";
+
+(window as Window & typeof globalThis & { bootstrap?: typeof bootstrap }).bootstrap = bootstrap;
+(window as Window & typeof globalThis & { ResizeObserver?: typeof ResizeObserver }).ResizeObserver = ResizeObserver;
+
+type DemoWsManagerEndpointSummary = {
+ name: string;
+ resolvedUrl: string;
+ provider: string;
+ roles: string[];
+ connectionState: string;
+ activeSubscriptionCount: number;
+};
+
+type DemoWsManagerSnapshotPayload = {
+ endpointCount: number;
+ startedCount: number;
+ endpoints: DemoWsManagerEndpointSummary[];
+};
+
+const endpointCountText = document.querySelector("#demoWsManagerEndpointCountText");
+const startedCountText = document.querySelector("#demoWsManagerStartedCountText");
+const roleSelect = document.querySelector("#demoWsManagerRoleSelect");
+const tableBody = document.querySelector("#demoWsManagerTableBody");
+const logTextarea = document.querySelector("#demoWsManagerLogTextarea");
+const startAllButton = document.querySelector("#demoWsManagerStartAllButton");
+const stopAllButton = document.querySelector("#demoWsManagerStopAllButton");
+const refreshButton = document.querySelector("#demoWsManagerRefreshButton");
+const startRoleButton = document.querySelector("#demoWsManagerStartRoleButton");
+const stopRoleButton = document.querySelector("#demoWsManagerStopRoleButton");
+const clearLogButton = document.querySelector("#demoWsManagerClearLogButton");
+
+function appendLogLine(line: string): void {
+ if (!logTextarea) {
+ return;
+ }
+ const prefix = logTextarea.value.length > 0 ? "\n" : "";
+ logTextarea.value += `${prefix}${line}`;
+ logTextarea.scrollTop = logTextarea.scrollHeight;
+}
+
+function renderSnapshot(snapshot: DemoWsManagerSnapshotPayload): void {
+ if (endpointCountText) {
+ endpointCountText.textContent = String(snapshot.endpointCount);
+ }
+ if (startedCountText) {
+ startedCountText.textContent = String(snapshot.startedCount);
+ }
+ if (!tableBody) {
+ return;
+ }
+
+ tableBody.innerHTML = "";
+
+ for (const endpoint of snapshot.endpoints) {
+ const row = document.createElement("tr");
+ row.innerHTML = `
+
+ ${endpoint.name}
+ ${endpoint.resolvedUrl}
+
+ ${endpoint.provider}
+ ${endpoint.roles.join(", ")}
+ ${endpoint.connectionState}
+ ${endpoint.activeSubscriptionCount}
+ `;
+ tableBody.appendChild(row);
+ }
+}
+
+async function refreshSnapshot(): Promise {
+ try {
+ const snapshot = await invoke("demo_ws_manager_get_snapshot");
+ renderSnapshot(snapshot);
+ appendLogLine("[ui] refreshed manager snapshot");
+ } catch (error) {
+ appendLogLine(`[ui] snapshot error: ${String(error)}`);
+ }
+}
+
+async function loadRoles(): Promise {
+ if (!roleSelect) {
+ return;
+ }
+
+ roleSelect.innerHTML = "";
+
+ try {
+ const roles = await invoke("demo_ws_manager_list_roles");
+ for (const role of roles) {
+ const option = document.createElement("option");
+ option.value = role;
+ option.textContent = role;
+ roleSelect.appendChild(option);
+ }
+ } catch (error) {
+ appendLogLine(`[ui] list roles error: ${String(error)}`);
+ }
+}
+
+async function startAll(): Promise {
+ try {
+ const snapshot = await invoke("demo_ws_manager_start_all");
+ renderSnapshot(snapshot);
+ } catch (error) {
+ appendLogLine(`[ui] start all error: ${String(error)}`);
+ }
+}
+
+async function stopAll(): Promise {
+ try {
+ const snapshot = await invoke("demo_ws_manager_stop_all");
+ renderSnapshot(snapshot);
+ } catch (error) {
+ appendLogLine(`[ui] stop all error: ${String(error)}`);
+ }
+}
+
+async function startRole(): Promise {
+ if (!roleSelect || roleSelect.value.trim().length === 0) {
+ appendLogLine("[ui] no role selected");
+ return;
+ }
+
+ try {
+ const snapshot = await invoke("demo_ws_manager_start_role", {
+ role: roleSelect.value,
+ });
+ renderSnapshot(snapshot);
+ } catch (error) {
+ appendLogLine(`[ui] start role error: ${String(error)}`);
+ }
+}
+
+async function stopRole(): Promise {
+ if (!roleSelect || roleSelect.value.trim().length === 0) {
+ appendLogLine("[ui] no role selected");
+ return;
+ }
+
+ try {
+ const snapshot = await invoke("demo_ws_manager_stop_role", {
+ role: roleSelect.value,
+ });
+ renderSnapshot(snapshot);
+ } catch (error) {
+ appendLogLine(`[ui] stop role error: ${String(error)}`);
+ }
+}
+
+document.addEventListener("DOMContentLoaded", async () => {
+ void takeoverConsole();
+ debug("demo_ws_manager window loaded");
+
+ const sidebarToggle = document.querySelector('#sidebarToggle');
+ if (sidebarToggle) {
+ // restaurer l’état depuis localStorage
+ if (localStorage.getItem('sidebar-toggle') === 'true') {
+ document.body.classList.add('sidenav-toggled');
+ }
+
+ sidebarToggle.addEventListener('click', (event) => {
+ event.preventDefault();
+ document.body.classList.toggle('sidenav-toggled');
+ localStorage.setItem('sidebar-toggle', document.body.classList.contains('sidenav-toggled') ? 'true' : 'false');
+ });
+ }
+
+ const tooltipTriggerList = document.querySelectorAll('[data-bs-toggle="tooltip"]');
+ Array.from(tooltipTriggerList).map(tooltipTriggerEl => new bootstrap.Tooltip(tooltipTriggerEl));
+ const toastElList = document.querySelectorAll('.toast');
+ Array.from(toastElList).map(toastEl => new bootstrap.Toast(toastEl));
+ const popoverTriggerList = document.querySelectorAll('[data-bs-toggle="popover"]');
+ Array.from(popoverTriggerList).map(popoverTriggerEl => new bootstrap.Popover(popoverTriggerEl));
+
+ const gobackto = location.pathname + location.search;
+
+ document.querySelectorAll('a[data-setlang]').forEach((a) => {
+ const href = a.getAttribute("href");
+ if (!href) return; // pas de href => on ignore
+
+ const url = new URL(href, location.origin);
+ url.searchParams.set("gobackto", gobackto);
+
+ // conserve une URL relative (path + query)
+ a.setAttribute("href", url.pathname + "?" + url.searchParams.toString());
+ });
+
+ if (startAllButton) {
+ startAllButton.addEventListener("click", () => {
+ void startAll();
+ });
+ }
+ if (stopAllButton) {
+ stopAllButton.addEventListener("click", () => {
+ void stopAll();
+ });
+ }
+ if (refreshButton) {
+ refreshButton.addEventListener("click", () => {
+ void refreshSnapshot();
+ });
+ }
+ if (startRoleButton) {
+ startRoleButton.addEventListener("click", () => {
+ void startRole();
+ });
+ }
+ if (stopRoleButton) {
+ stopRoleButton.addEventListener("click", () => {
+ void stopRole();
+ });
+ }
+ if (clearLogButton && logTextarea) {
+ clearLogButton.addEventListener("click", () => {
+ logTextarea.value = "";
+ });
+ }
+
+ await listen("kb-demo-ws-manager-log", (event) => {
+ appendLogLine(event.payload);
+ });
+
+ await listen("kb-demo-ws-manager-snapshot", (event) => {
+ renderSnapshot(event.payload);
+ });
+
+ await loadRoles();
+ await refreshSnapshot();
+});
diff --git a/kb_app/frontend/ts/main.ts b/kb_app/frontend/ts/main.ts
index 4b9605c..81112d9 100644
--- a/kb_app/frontend/ts/main.ts
+++ b/kb_app/frontend/ts/main.ts
@@ -23,6 +23,14 @@ async function openDemoHttpWindow(): Promise {
console.error("open_demo_http_window failed:", error);
}
}
+
+async function openDemoWsManagerWindow(): Promise {
+ try {
+ await invoke("open_demo_ws_manager_window");
+ } catch (error) {
+ console.error("open_demo_ws_manager_window failed:", error);
+ }
+}
document.addEventListener("DOMContentLoaded", async () => {
void takeoverConsole();
const sidebarToggle = document.querySelector('#sidebarToggle');
@@ -64,6 +72,8 @@ document.addEventListener("DOMContentLoaded", async () => {
const openDemoHttpButton = document.querySelector("#openDemoHttpButton");
const openDemoHttpButtonSecondary = document.querySelector("#openDemoHttpButtonSecondary");
+ const openDemoWsManagerButton = document.querySelector("#openDemoWsManagerButton");
+ const openDemoWsManagerButtonSecondary = document.querySelector("#openDemoWsManagerButtonSecondary");
if (openDemoWsButton) {
openDemoWsButton.addEventListener("click", () => {
@@ -88,6 +98,18 @@ document.addEventListener("DOMContentLoaded", async () => {
void openDemoHttpWindow();
});
}
+
+ if (openDemoWsManagerButton) {
+ openDemoWsManagerButton.addEventListener("click", () => {
+ void openDemoWsManagerWindow();
+ });
+ }
+
+ if (openDemoWsManagerButtonSecondary) {
+ openDemoWsManagerButtonSecondary.addEventListener("click", () => {
+ void openDemoWsManagerWindow();
+ });
+ }
debug("window loaded");
diff --git a/kb_app/gen/schemas/capabilities.json b/kb_app/gen/schemas/capabilities.json
index 942cafe..99db2a1 100644
--- a/kb_app/gen/schemas/capabilities.json
+++ b/kb_app/gen/schemas/capabilities.json
@@ -1 +1 @@
-{"default":{"identifier":"default","description":"Capability for the main window","local":true,"windows":["main","splash","demo_ws","demo_http"],"permissions":["core:default","tracing:default"]}}
\ No newline at end of file
+{"default":{"identifier":"default","description":"Capability for the main window","local":true,"windows":["main","splash","demo_ws","demo_http","demo_ws_manager"],"permissions":["core:default","tracing:default"]}}
\ No newline at end of file
diff --git a/kb_app/package.json b/kb_app/package.json
index e2b8c88..1c58b35 100644
--- a/kb_app/package.json
+++ b/kb_app/package.json
@@ -1,7 +1,7 @@
{
"name": "kb-app",
"private": true,
- "version": "0.4.4",
+ "version": "0.6.6",
"type": "module",
"scripts": {
"dev": "vite",
diff --git a/kb_app/src/demo_ws_manager.rs b/kb_app/src/demo_ws_manager.rs
new file mode 100644
index 0000000..974ee4a
--- /dev/null
+++ b/kb_app/src/demo_ws_manager.rs
@@ -0,0 +1,565 @@
+// file: kb_app/src/demo_ws_manager.rs
+
+//! Demo WebSocket manager window commands and runtime state.
+//!
+//! This module provides a lightweight test bench for `kb_lib::WsManager`.
+
+use tauri::Emitter;
+use tauri::Manager;
+
+/// Static endpoint summary enriched with current manager state.
+#[derive(Clone, Debug, serde::Serialize)]
+#[serde(rename_all = "camelCase")]
+pub(crate) struct KbDemoWsManagerEndpointSummary {
+ name: std::string::String,
+ resolved_url: std::string::String,
+ provider: std::string::String,
+ roles: std::vec::Vec,
+ connection_state: std::string::String,
+ active_subscription_count: usize,
+}
+
+/// Global demo manager snapshot payload.
+#[derive(Clone, Debug, serde::Serialize)]
+#[serde(rename_all = "camelCase")]
+pub(crate) struct KbDemoWsManagerSnapshotPayload {
+ endpoint_count: usize,
+ started_count: usize,
+ endpoints: std::vec::Vec,
+}
+
+/// Runtime state for the demo WebSocket manager window.
+#[derive(Debug)]
+pub(crate) struct KbDemoWsManagerRuntimeState {
+ relay_task: std::option::Option>,
+}
+
+impl KbDemoWsManagerRuntimeState {
+ /// Creates a new empty runtime state.
+ pub(crate) fn new() -> Self {
+ Self { relay_task: None }
+ }
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct DemoWsManagerActionResult {
+ action: std::string::String,
+ target: std::string::String,
+ matched_count: usize,
+ changed_count: usize,
+ unchanged_count: usize,
+}
+
+/// Shows and focuses the preconfigured `demo_ws_manager` window.
+#[tauri::command]
+pub(crate) async fn open_demo_ws_manager_window(
+ app_handle: tauri::AppHandle,
+ state: tauri::State<'_, crate::KbAppState>,
+) -> Result<(), std::string::String> {
+ kb_ensure_demo_ws_manager_relay(&app_handle, &state).await;
+ let existing_window_option = app_handle.get_webview_window("demo_ws_manager");
+ let demo_window = match existing_window_option {
+ Some(demo_window) => demo_window,
+ None => {
+ let builder = tauri::WebviewWindowBuilder::new(
+ &app_handle,
+ "demo_ws_manager",
+ tauri::WebviewUrl::App("demo_ws_manager.html".into()),
+ )
+ .title("Demo Ws Manager")
+ .inner_size(1280.0, 800.0)
+ .min_inner_size(900.0, 620.0)
+ .center()
+ .visible(true)
+ .transparent(false)
+ .decorations(true);
+ let build_result = builder.build();
+ match build_result {
+ Ok(window) => window,
+ Err(error) => {
+ return Err(format!("cannot create demo_ws_manager window: {error:?}"));
+ }
+ }
+ }
+ };
+ let show_result = demo_window.show();
+ if let Err(error) = show_result {
+ return Err(format!("cannot show demo_ws_manager window: {error:?}"));
+ }
+ let focus_result = demo_window.set_focus();
+ if let Err(error) = focus_result {
+ return Err(format!("cannot focus demo_ws_manager window: {error:?}"));
+ }
+ kb_emit_demo_ws_manager_log(&app_handle, "[ui] demo_ws_manager window loaded");
+ kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await;
+ Ok(())
+}
+
+/// Returns the current manager snapshot.
+#[tauri::command]
+pub(crate) async fn demo_ws_manager_get_snapshot(
+ state: tauri::State<'_, crate::KbAppState>,
+) -> Result {
+ kb_build_demo_ws_manager_snapshot(&state).await
+}
+
+/// Returns the distinct configured roles for enabled websocket endpoints.
+#[tauri::command]
+pub(crate) async fn demo_ws_manager_list_roles(
+ state: tauri::State<'_, crate::KbAppState>,
+) -> Result, std::string::String> {
+ let mut roles = std::collections::BTreeSet::new();
+ for endpoint in &state.config.solana.ws_endpoints {
+ if !endpoint.enabled {
+ continue;
+ }
+ for role in &endpoint.roles {
+ roles.insert(role.clone());
+ }
+ }
+ Ok(roles.into_iter().collect())
+}
+
+/// Starts all managed websocket endpoints.
+#[tauri::command]
+pub(crate) async fn demo_ws_manager_start_all(
+ app_handle: tauri::AppHandle,
+ state: tauri::State<'_, crate::KbAppState>,
+) -> Result {
+ kb_ensure_demo_ws_manager_relay(&app_handle, &state).await;
+ let matched_count = state.ws_manager.endpoint_names().await.len();
+ let start_result = state.ws_manager.start_all().await;
+ let changed_count = match start_result {
+ Ok(changed_count) => changed_count,
+ Err(error) => return Err(error.to_string()),
+ };
+ let action_result = kb_build_action_result("start", "all", matched_count, changed_count);
+ kb_emit_demo_ws_manager_log(
+ &app_handle,
+ kb_format_action_result_for_log(&action_result).as_str(),
+ );
+ kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await;
+ kb_build_demo_ws_manager_snapshot(&state).await
+}
+
+/// Stops all managed websocket endpoints.
+#[tauri::command]
+pub(crate) async fn demo_ws_manager_stop_all(
+ app_handle: tauri::AppHandle,
+ state: tauri::State<'_, crate::KbAppState>,
+) -> Result {
+ let matched_count = state.ws_manager.endpoint_names().await.len();
+ let stop_result = state.ws_manager.stop_all().await;
+ let changed_count = match stop_result {
+ Ok(changed_count) => changed_count,
+ Err(error) => return Err(error.to_string()),
+ };
+ let action_result = kb_build_action_result("stop", "all", matched_count, changed_count);
+ kb_emit_demo_ws_manager_log(
+ &app_handle,
+ kb_format_action_result_for_log(&action_result).as_str(),
+ );
+ kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await;
+ kb_build_demo_ws_manager_snapshot(&state).await
+}
+
+/// Starts all managed websocket endpoints having the selected role.
+#[tauri::command]
+pub(crate) async fn demo_ws_manager_start_role(
+ app_handle: tauri::AppHandle,
+ state: tauri::State<'_, crate::KbAppState>,
+ role: std::string::String,
+) -> Result {
+ kb_ensure_demo_ws_manager_relay(&app_handle, &state).await;
+ let matched_count = state
+ .ws_manager
+ .endpoint_names_for_role(role.as_str())
+ .await
+ .len();
+ let start_result = state.ws_manager.start_role(role.as_str()).await;
+ let changed_count = match start_result {
+ Ok(changed_count) => changed_count,
+ Err(error) => return Err(error.to_string()),
+ };
+ let action_result =
+ kb_build_action_result("start", role.as_str(), matched_count, changed_count);
+ kb_emit_demo_ws_manager_log(
+ &app_handle,
+ kb_format_action_result_for_log(&action_result).as_str(),
+ );
+ kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await;
+ kb_build_demo_ws_manager_snapshot(&state).await
+}
+
+/// Stops all managed websocket endpoints having the selected role.
+#[tauri::command]
+pub(crate) async fn demo_ws_manager_stop_role(
+ app_handle: tauri::AppHandle,
+ state: tauri::State<'_, crate::KbAppState>,
+ role: std::string::String,
+) -> Result {
+ let matched_count = state
+ .ws_manager
+ .endpoint_names_for_role(role.as_str())
+ .await
+ .len();
+ let stop_result = state.ws_manager.stop_role(role.as_str()).await;
+ let changed_count = match stop_result {
+ Ok(changed_count) => changed_count,
+ Err(error) => return Err(error.to_string()),
+ };
+ let action_result = kb_build_action_result("stop", role.as_str(), matched_count, changed_count);
+ kb_emit_demo_ws_manager_log(
+ &app_handle,
+ kb_format_action_result_for_log(&action_result).as_str(),
+ );
+ kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await;
+ kb_build_demo_ws_manager_snapshot(&state).await
+}
+
+async fn kb_build_demo_ws_manager_snapshot(
+ state: &tauri::State<'_, crate::KbAppState>,
+) -> Result {
+ let snapshot_result = state.ws_manager.snapshot().await;
+ let snapshot = match snapshot_result {
+ Ok(snapshot) => snapshot,
+ Err(error) => return Err(error.to_string()),
+ };
+ let mut endpoints = std::vec::Vec::new();
+ for managed_endpoint in snapshot.endpoints {
+ let config_endpoint_option = state
+ .config
+ .find_ws_endpoint(&managed_endpoint.endpoint_name);
+ let config_endpoint = match config_endpoint_option {
+ Some(config_endpoint) => config_endpoint,
+ None => {
+ return Err(format!(
+ "managed websocket endpoint '{}' is missing from config",
+ managed_endpoint.endpoint_name
+ ));
+ }
+ };
+ endpoints.push(KbDemoWsManagerEndpointSummary {
+ name: managed_endpoint.endpoint_name,
+ resolved_url: managed_endpoint.resolved_url,
+ provider: managed_endpoint.provider,
+ roles: config_endpoint.roles.clone(),
+ connection_state: kb_connection_state_to_string(managed_endpoint.state),
+ active_subscription_count: managed_endpoint.active_subscription_count,
+ });
+ }
+ Ok(KbDemoWsManagerSnapshotPayload {
+ endpoint_count: snapshot.endpoint_count,
+ started_count: snapshot.started_count,
+ endpoints,
+ })
+}
+
+async fn kb_emit_demo_ws_manager_snapshot(
+ app_handle: &tauri::AppHandle,
+ state: &tauri::State<'_, crate::KbAppState>,
+) {
+ let snapshot_result = kb_build_demo_ws_manager_snapshot(state).await;
+ let snapshot = match snapshot_result {
+ Ok(snapshot) => snapshot,
+ Err(error) => {
+ kb_emit_demo_ws_manager_log(app_handle, &format!("[ui] snapshot error: {error}"));
+ return;
+ }
+ };
+ let emit_result = app_handle.emit("kb-demo-ws-manager-snapshot", snapshot);
+ if let Err(error) = emit_result {
+ tracing::error!("error emitting demo_ws_manager snapshot event: {error:?}");
+ }
+}
+
+async fn kb_ensure_demo_ws_manager_relay(
+ app_handle: &tauri::AppHandle,
+ state: &tauri::State<'_, crate::KbAppState>,
+) {
+ let mut runtime_guard = state.demo_ws_manager_runtime.lock().await;
+ if runtime_guard.relay_task.is_some() {
+ return;
+ }
+ let mut receiver = state.ws_manager.subscribe_events();
+ let relay_app_handle = app_handle.clone();
+ let relay_state = state.demo_ws_manager_runtime.clone();
+ let relay_task = tauri::async_runtime::spawn(async move {
+ loop {
+ let recv_result = receiver.recv().await;
+ match recv_result {
+ Ok(event) => {
+ let line = kb_format_ws_event(&event);
+ kb_emit_demo_ws_manager_log(&relay_app_handle, line.as_str());
+ }
+ Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
+ kb_emit_demo_ws_manager_log(
+ &relay_app_handle,
+ &format!(
+ "[manager] event receiver lagged and skipped {} message(s)",
+ skipped
+ ),
+ );
+ }
+ Err(tokio::sync::broadcast::error::RecvError::Closed) => {
+ break;
+ }
+ }
+ }
+ let mut runtime_guard = relay_state.lock().await;
+ runtime_guard.relay_task = None;
+ });
+
+ runtime_guard.relay_task = Some(relay_task);
+}
+
+fn kb_emit_demo_ws_manager_log(app_handle: &tauri::AppHandle, message: &str) {
+ let emit_result = app_handle.emit("kb-demo-ws-manager-log", message.to_string());
+ if let Err(error) = emit_result {
+ tracing::error!("error emitting demo_ws_manager log event: {error:?}");
+ }
+}
+
+fn kb_connection_state_to_string(state: kb_lib::KbConnectionState) -> std::string::String {
+ match state {
+ kb_lib::KbConnectionState::Disconnected => "Disconnected".to_string(),
+ kb_lib::KbConnectionState::Connecting => "Connecting".to_string(),
+ kb_lib::KbConnectionState::Connected => "Connected".to_string(),
+ kb_lib::KbConnectionState::Disconnecting => "Disconnecting".to_string(),
+ }
+}
+
+fn kb_format_ws_event(event: &kb_lib::WsEvent) -> std::string::String {
+ match event {
+ kb_lib::WsEvent::Connected {
+ endpoint_name,
+ endpoint_url,
+ } => {
+ format!("[ws:{endpoint_name}] connected to {endpoint_url}")
+ }
+ kb_lib::WsEvent::TextMessage {
+ endpoint_name,
+ text,
+ } => {
+ format!("[ws:{endpoint_name}] text: {text}")
+ }
+ kb_lib::WsEvent::JsonRpcMessage {
+ endpoint_name,
+ message,
+ } => match message {
+ kb_lib::KbJsonRpcWsIncomingMessage::SuccessResponse(response) => {
+ format!(
+ "[ws:{endpoint_name}] json-rpc success id={} result={}",
+ response.id, response.result
+ )
+ }
+ kb_lib::KbJsonRpcWsIncomingMessage::ErrorResponse(response) => {
+ format!(
+ "[ws:{endpoint_name}] json-rpc error id={} code={} message={}",
+ response.id, response.error.code, response.error.message
+ )
+ }
+ kb_lib::KbJsonRpcWsIncomingMessage::Notification(notification) => {
+ format!(
+ "[ws:{endpoint_name}] json-rpc notification method={} subscription={} result={}",
+ notification.method,
+ notification.params.subscription,
+ notification.params.result
+ )
+ }
+ },
+ kb_lib::WsEvent::JsonRpcParseError {
+ endpoint_name,
+ text,
+ error,
+ } => {
+ format!(
+ "[ws:{endpoint_name}] json-rpc parse error: {} | raw={}",
+ error, text
+ )
+ }
+ kb_lib::WsEvent::SubscriptionRegistered {
+ endpoint_name,
+ subscription,
+ } => {
+ format!(
+ "[ws:{endpoint_name}] subscription registered subscribe_method={} unsubscribe_method={} notification_method={} request_id={} subscription_id={}",
+ subscription.subscribe_method,
+ subscription.unsubscribe_method,
+ subscription.notification_method,
+ subscription.request_id,
+ subscription.subscription_id
+ )
+ }
+ kb_lib::WsEvent::SubscriptionNotification {
+ endpoint_name,
+ subscription,
+ notification,
+ method_matches_registry,
+ } => {
+ format!(
+ "[ws:{endpoint_name}] tracked notification subscription_id={} method={} expected={} matches={} result={}",
+ subscription.subscription_id,
+ notification.method,
+ subscription.notification_method,
+ method_matches_registry,
+ notification.params.result
+ )
+ }
+ kb_lib::WsEvent::JsonRpcNotificationWithoutSubscription {
+ endpoint_name,
+ notification,
+ } => {
+ format!(
+ "[ws:{endpoint_name}] untracked notification method={} subscription={} result={}",
+ notification.method, notification.params.subscription, notification.params.result
+ )
+ }
+ kb_lib::WsEvent::SubscriptionUnregistered {
+ endpoint_name,
+ subscription_id,
+ unsubscribe_method,
+ was_active,
+ } => {
+ format!(
+ "[ws:{endpoint_name}] subscription unregistered subscription_id={} unsubscribe_method={} was_active={}",
+ subscription_id, unsubscribe_method, was_active
+ )
+ }
+ kb_lib::WsEvent::BinaryMessage {
+ endpoint_name,
+ data,
+ } => {
+ format!("[{endpoint_name}] binary ({} bytes)", data.len())
+ }
+ kb_lib::WsEvent::Ping {
+ endpoint_name,
+ data,
+ } => {
+ format!("[{endpoint_name}] ping ({} bytes)", data.len())
+ }
+ kb_lib::WsEvent::Pong {
+ endpoint_name,
+ data,
+ } => {
+ format!("[{endpoint_name}] pong ({} bytes)", data.len())
+ }
+ kb_lib::WsEvent::CloseReceived {
+ endpoint_name,
+ code,
+ reason,
+ } => {
+ format!(
+ "[ws:{endpoint_name}] close received code={:?} reason={:?}",
+ code, reason
+ )
+ }
+ kb_lib::WsEvent::Disconnected { endpoint_name } => {
+ format!("[ws:{endpoint_name}] disconnected")
+ }
+ kb_lib::WsEvent::Error {
+ endpoint_name,
+ error,
+ } => {
+ format!("[ws:{endpoint_name}] error: {error}")
+ }
+ }
+}
+
+fn kb_build_action_result(
+ action: &str,
+ target: &str,
+ matched_count: usize,
+ changed_count: usize,
+) -> DemoWsManagerActionResult {
+ let unchanged_count = matched_count.saturating_sub(changed_count);
+ DemoWsManagerActionResult {
+ action: action.to_string(),
+ target: target.to_string(),
+ matched_count,
+ changed_count,
+ unchanged_count,
+ }
+}
+
+fn kb_action_past_tense(action: &str) -> &'static str {
+ match action {
+ "start" => "started",
+ "stop" => "stopped",
+ _ => "processed",
+ }
+}
+
+fn kb_format_action_result_for_log(result: &DemoWsManagerActionResult) -> std::string::String {
+ let is_all = result.target == "all";
+ let past = kb_action_past_tense(result.action.as_str());
+ if result.matched_count == 0 {
+ if is_all {
+ return "[ui] no managed websocket endpoint is configured".to_string();
+ }
+ return format!(
+ "[ui] no managed websocket endpoint matches role '{}'",
+ result.target
+ );
+ }
+ if result.changed_count == 0 {
+ if is_all {
+ return format!(
+ "[ui] all managed websocket endpoints were already {}",
+ if result.action == "start" {
+ "started"
+ } else {
+ "stopped"
+ }
+ );
+ }
+ return format!(
+ "[ui] role '{}' was already {} on {} endpoint(s)",
+ result.target,
+ if result.action == "start" {
+ "started"
+ } else {
+ "stopped"
+ },
+ result.unchanged_count
+ );
+ }
+ if result.unchanged_count == 0 {
+ if is_all {
+ return format!(
+ "[ui] {}ed {} managed websocket endpoint(s)",
+ past, result.changed_count
+ );
+ }
+ return format!(
+ "[ui] {}ed role '{}' on {} endpoint(s)",
+ past, result.target, result.changed_count
+ );
+ }
+ if is_all {
+ return format!(
+ "[ui] {}ed {} managed websocket endpoint(s); {} already {}",
+ past,
+ result.changed_count,
+ result.unchanged_count,
+ if result.action == "start" {
+ "started"
+ } else {
+ "stopped"
+ }
+ );
+ }
+ format!(
+ "[ui] {}ed role '{}' on {} endpoint(s); {} already {}",
+ result.action,
+ result.target,
+ result.changed_count,
+ result.unchanged_count,
+ if result.action == "start" {
+ "started"
+ } else {
+ "stopped"
+ }
+ )
+}
diff --git a/kb_app/src/lib.rs b/kb_app/src/lib.rs
index 91aa960..c37969a 100644
--- a/kb_app/src/lib.rs
+++ b/kb_app/src/lib.rs
@@ -11,6 +11,7 @@
mod demo_http;
mod demo_ws;
+mod demo_ws_manager;
mod splash;
pub use crate::splash::SplashOrder;
@@ -37,6 +38,8 @@ struct KbAppState {
config: kb_lib::KbConfig,
ws_runtime: tokio::sync::Mutex,
demo_ws_runtime: std::sync::Arc>,
+ demo_ws_manager_runtime: std::sync::Arc>,
+ ws_manager: std::sync::Arc,
http_pool: kb_lib::HttpEndpointPool,
}
@@ -82,12 +85,24 @@ pub fn run() {
panic!("cannot create http endpoint pool: {}", error);
}
};
+ let ws_manager_result = kb_lib::WsManager::from_config(&config);
+ let ws_manager = match ws_manager_result {
+ Ok(ws_manager) => ws_manager,
+ Err(error) => {
+ tracing::error!("cannot create websocket manager: {}", error);
+ panic!("cannot create websocket manager: {}", error);
+ }
+ };
let app_state = KbAppState {
config: config.clone(),
ws_runtime: tokio::sync::Mutex::new(KbWsRuntimeState::new()),
demo_ws_runtime: std::sync::Arc::new(tokio::sync::Mutex::new(
crate::demo_ws::KbDemoWsRuntimeState::new(),
)),
+ demo_ws_manager_runtime: std::sync::Arc::new(tokio::sync::Mutex::new(
+ crate::demo_ws_manager::KbDemoWsManagerRuntimeState::new(),
+ )),
+ ws_manager: std::sync::Arc::new(ws_manager),
http_pool,
};
let tracing_builder = tauri_plugin_tracing::Builder::new();
@@ -106,6 +121,13 @@ pub fn run() {
crate::demo_http::open_demo_http_window,
crate::demo_http::demo_http_list_pool_clients,
crate::demo_http::demo_http_execute_request,
+ crate::demo_ws_manager::open_demo_ws_manager_window,
+ crate::demo_ws_manager::demo_ws_manager_get_snapshot,
+ crate::demo_ws_manager::demo_ws_manager_list_roles,
+ crate::demo_ws_manager::demo_ws_manager_start_all,
+ crate::demo_ws_manager::demo_ws_manager_stop_all,
+ crate::demo_ws_manager::demo_ws_manager_start_role,
+ crate::demo_ws_manager::demo_ws_manager_stop_role,
]);
tauri_builder = tauri_builder.plugin(tracing_builder.build::());
tauri_builder = tauri_builder.setup(|app| {
diff --git a/kb_app/tauri.conf.json b/kb_app/tauri.conf.json
index 203f714..833d2de 100644
--- a/kb_app/tauri.conf.json
+++ b/kb_app/tauri.conf.json
@@ -1,7 +1,7 @@
{
"$schema": "https://schema.tauri.app/config/2",
"productName": "kb-bapp",
- "version": "0.4.4",
+ "version": "0.6.6",
"identifier": "com.sasedev.kb-app",
"build": {
"beforeDevCommand": "npm run dev",
@@ -64,6 +64,20 @@
"create": false,
"transparent": false,
"decorations": true
+ },
+ {
+ "label": "demo_ws_manager",
+ "url": "demo_ws_manager.html",
+ "title": "Demo Ws Manager",
+ "width": 1280,
+ "height": 800,
+ "minWidth": 900,
+ "minHeight": 620,
+ "center": true,
+ "visible": false,
+ "create": false,
+ "transparent": false,
+ "decorations": true
}
],
"security": {
diff --git a/kb_lib/src/ws_manager.rs b/kb_lib/src/ws_manager.rs
index 74db560..a976a21 100644
--- a/kb_lib/src/ws_manager.rs
+++ b/kb_lib/src/ws_manager.rs
@@ -139,11 +139,14 @@ impl WsManager {
let endpoint_names = self.endpoint_names_for_role(role).await;
let mut started_count = 0_usize;
for endpoint_name in endpoint_names {
- let start_result = self.start_endpoint(endpoint_name.as_str()).await;
- if let Err(error) = start_result {
- return Err(error);
+ let start_result = self.start_endpoint_inner(endpoint_name.as_str()).await;
+ let started = match start_result {
+ Ok(started) => started,
+ Err(error) => return Err(error),
+ };
+ if started {
+ started_count += 1;
}
- started_count += 1;
}
Ok(started_count)
}
@@ -153,11 +156,14 @@ impl WsManager {
let endpoint_names = self.endpoint_names_for_role(role).await;
let mut stopped_count = 0_usize;
for endpoint_name in endpoint_names {
- let stop_result = self.stop_endpoint(endpoint_name.as_str()).await;
- if let Err(error) = stop_result {
- return Err(error);
+ let stop_result = self.stop_endpoint_inner(endpoint_name.as_str()).await;
+ let stopped = match stop_result {
+ Ok(stopped) => stopped,
+ Err(error) => return Err(error),
+ };
+ if stopped {
+ stopped_count += 1;
}
- stopped_count += 1;
}
Ok(stopped_count)
}
@@ -172,8 +178,7 @@ impl WsManager {
}
}
- /// Starts one managed endpoint.
- pub async fn start_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> {
+ async fn start_endpoint_inner(&self, endpoint_name: &str) -> Result {
let client_option = self.client(endpoint_name).await;
let client = match client_option {
Some(client) => client,
@@ -184,6 +189,12 @@ impl WsManager {
)));
}
};
+ let state = client.connection_state().await;
+ if state == crate::KbConnectionState::Connected
+ || state == crate::KbConnectionState::Connecting
+ {
+ return Ok(false);
+ }
let sender_option = {
let sender_guard = self.detection_relay_sender.lock().await;
sender_guard.clone()
@@ -195,11 +206,19 @@ impl WsManager {
if let Err(error) = connect_result {
return Err(error);
}
- Ok(())
+ Ok(true)
}
- /// Stops one managed endpoint.
- pub async fn stop_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> {
+ /// Starts one managed endpoint.
+ pub async fn start_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> {
+ let start_result = self.start_endpoint_inner(endpoint_name).await;
+ match start_result {
+ Ok(_) => Ok(()),
+ Err(error) => Err(error),
+ }
+ }
+
+ async fn stop_endpoint_inner(&self, endpoint_name: &str) -> Result {
let client_option = self.client(endpoint_name).await;
let client = match client_option {
Some(client) => client,
@@ -210,12 +229,27 @@ impl WsManager {
)));
}
};
+ let state = client.connection_state().await;
+ if state == crate::KbConnectionState::Disconnected
+ || state == crate::KbConnectionState::Disconnecting
+ {
+ return Ok(false);
+ }
client.clear_detection_notification_forwarder().await;
let disconnect_result = client.disconnect().await;
if let Err(error) = disconnect_result {
return Err(error);
}
- Ok(())
+ Ok(true)
+ }
+
+ /// Stops one managed endpoint.
+ pub async fn stop_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> {
+ let stop_result = self.stop_endpoint_inner(endpoint_name).await;
+ match stop_result {
+ Ok(_) => Ok(()),
+ Err(error) => Err(error),
+ }
}
/// Starts all managed endpoints.
@@ -223,11 +257,14 @@ impl WsManager {
let endpoint_names = self.endpoint_names().await;
let mut started_count = 0_usize;
for endpoint_name in endpoint_names {
- let start_result = self.start_endpoint(endpoint_name.as_str()).await;
- if let Err(error) = start_result {
- return Err(error);
+ let start_result = self.start_endpoint_inner(endpoint_name.as_str()).await;
+ let started = match start_result {
+ Ok(started) => started,
+ Err(error) => return Err(error),
+ };
+ if started {
+ started_count += 1;
}
- started_count += 1;
}
Ok(started_count)
}
@@ -237,11 +274,14 @@ impl WsManager {
let endpoint_names = self.endpoint_names().await;
let mut stopped_count = 0_usize;
for endpoint_name in endpoint_names {
- let stop_result = self.stop_endpoint(endpoint_name.as_str()).await;
- if let Err(error) = stop_result {
- return Err(error);
+ let stop_result = self.stop_endpoint_inner(endpoint_name.as_str()).await;
+ let stopped = match stop_result {
+ Ok(stopped) => stopped,
+ Err(error) => return Err(error),
+ };
+ if stopped {
+ stopped_count += 1;
}
- stopped_count += 1;
}
Ok(stopped_count)
}