From f90ca402023851d97d4c4eabd46d176afa569ac6 Mon Sep 17 00:00:00 2001 From: SinuS Von SifriduS Date: Sat, 25 Apr 2026 13:52:07 +0200 Subject: [PATCH] 0.6.5 --- CHANGELOG.md | 1 + Cargo.toml | 2 +- ROADMAP.md | 103 ++++- kb_lib/src/lib.rs | 32 +- kb_lib/src/ws_manager.rs | 956 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 1063 insertions(+), 31 deletions(-) create mode 100644 kb_lib/src/ws_manager.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 57e1053..8161508 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,3 +28,4 @@ 0.6.2 - Branchement de WsClient vers le pipeline de détection via un relais asynchrone de notifications JSON-RPC WebSocket 0.6.3 - Enrichissement des notifications WebSocket utiles : extraction améliorée de pubkey, signature, owner, parsed account type et slot pour account/logs/signature notifications 0.6.4 - Premières règles de détection technique pour candidats pools/listings depuis programNotification en s’appuyant sur les DEX connus en base +0.6.5 - Ajout de ws_manager.rs pour l’orchestration multi-clients WebSocket, le bus d’événements unifié et le branchement centralisé du relais de détection diff --git a/Cargo.toml b/Cargo.toml index 65bfee1..5d20d1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.6.4" +version = "0.6.5" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index 5d4dee3..2d0a8b1 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -427,17 +427,87 @@ Réalisé : - maintien d’une logique encore indépendante des connecteurs DEX `0.7.x`. ### 6.030. Version `0.6.5` — Orchestration multi-clients WebSocket -Objectif : préparer la gestion coordonnée de plusieurs `WsClient`. +Réalisé : + +- introduction d’une abstraction `ws_manager.rs` pour piloter plusieurs `WsClient`, +- construction des clients WS activés depuis la configuration d’endpoints, +- démarrage et arrêt centralisés par endpoint ou globalement, +- republication d’un flux unifié de `WsEvent` pour l’ensemble des clients gérés, +- branchement optionnel du relais de détection WS sur tous les clients orchestrés, +- préparation des futures politiques de répartition, supervision et reconnexion. + +### 6.031. Version `0.7.0` — Résolution transactionnelle orientée DEX +Objectif : relier la détection temps réel aux transactions Solana complètes. À faire : -- introduire une abstraction `ws_pool.rs` ou `ws_manager.rs`, -- piloter plusieurs clients WS selon les rôles d’endpoints configurés, -- centraliser le démarrage, l’arrêt, l’état et les relais de notifications WS, -- préparer les futures politiques de répartition, supervision et reconnexion. +- introduire une file de résolution transactionnelle alimentée par les signatures issues des flux WS, +- corréler `logsNotification`, `programNotification` et `signatureNotification` avec des appels `getTransaction`, +- utiliser le pool HTTP existant pour enrichir les signaux détectés côté WS, +- éviter qu’une notification intéressante reste au niveau d’un simple signal technique sans résolution métier. -### 6.031. Version `0.7.x` — DEX connectors v1 -Objectif : structurer les connecteurs par protocole. +### 6.032. Version `0.7.1` — Modèle transactionnel Solana enrichi +Objectif : préparer un modèle interne plus riche, inspiré d’une vision `slot -> signature -> instruction`. + +À faire : + +- préparer les structures et tables permettant de relier blocs/slots, signatures et instructions, +- distinguer clairement transaction, instruction principale et éventuelles inner instructions, +- conserver la possibilité de relier plus tard un pool, un token ou un wallet à une signature fondatrice, +- préparer l’historique transactionnel nécessaire aux futurs décodeurs DEX. + +### 6.033. Version `0.7.2` — Décodeurs DEX spécifiques par programme et version +Objectif : remplacer les heuristiques ponctuelles par de vrais décodeurs Rust dédiés. + +À faire : + +- introduire des règles spécifiques à chaque DEX / version de programme, +- détecter les instructions utiles à la création de pools, paires et évènements de liquidité, +- encapsuler les index de comptes et les motifs de logs propres à chaque protocole, +- prévoir des décodeurs séparés au minimum pour Raydium, Pump.fun / PumpSwap, Meteora, puis les autres cibles. + +### 6.034. Version `0.7.3` — Détection des nouveaux pools et paires via logs + transaction +Objectif : détecter rapidement les nouvelles paires/pools à partir des flux RPC et des transactions enrichies. + +À faire : + +- relier `logsSubscribe` + `signature` + `getTransaction`, +- détecter les créations de pools via motifs et instructions spécifiques par DEX, +- extraire token A, token B, LP mint, vaults et comptes utiles quand cela est possible, +- alimenter `kb_pools`, `kb_pairs`, `kb_pool_tokens` et `kb_pool_listings` avec des données plus fiables que la seule détection de comptes. + +### 6.035. Version `0.7.4` — Modèle métier DEX enrichi +Objectif : faire converger la détection technique et le modèle métier vers une vision proche de l’activité réelle du marché. + +À faire : + +- enrichir le modèle `pool / pair / pool_token / listing` avec les informations utiles à la lecture DEX, +- préparer le rattachement d’une paire à son pool de création et à sa signature de fondation, +- préparer une vision cohérente `token <-> pool <-> pair <-> protocole`, +- distinguer les objets de référence des événements d’activité. + +### 6.036. Version `0.7.5` — Wallets, holdings et participants observés +Objectif : préparer le suivi des acteurs on-chain autour des pools et tokens détectés. + +À faire : + +- préparer le rattachement des signatures, instructions et événements à des wallets observés, +- introduire la notion de holdings utiles au suivi des tokens, +- préparer l’identification des créateurs, mint authorities, wallets d’activité et contreparties, +- éviter de limiter l’analyse future au seul niveau token/pool sans vision des participants. + +### 6.037. Version `0.7.6` — Séries de prix, volumes et agrégats DEX +Objectif : préparer la couche analytique fine à partir des événements métier normalisés. + +À faire : + +- préparer des agrégations de prix/volume par paire, +- introduire la base des futures candles et séries temporelles, +- permettre plus tard le calcul d’OHLCV, volume, nombre de trades et liquidité par fenêtre, +- préparer le terrain pour la couche analytique `0.8.x`. + +### 6.038. Version `0.7.x` — DEX connectors v1 +Objectif : structurer les connecteurs DEX autour d’un pipeline complet de résolution, décodage et normalisation métier. Cibles initiales possibles : @@ -448,14 +518,15 @@ Cibles initiales possibles : - FluxBeam - DexLab -À faire : +Résultat attendu : -- identification des programmes, -- décodage des événements utiles, -- création de types métiers propres, -- enrichissement des métadonnées token/pool/pair. +- identification fiable des programmes et versions, +- résolution des signatures pertinentes, +- décodage des transactions utiles, +- création d’objets métier riches pour tokens, pools, paires, wallets, holdings et séries de prix, +- remplacement progressif des scripts heuristiques externes par des composants Rust intégrés. -### 6.032. Version `0.8.x` — Analyse et filtrage +### 6.039. Version `0.8.x` — Analyse et filtrage Objectif : transformer les événements bruts en signaux exploitables. À faire : @@ -466,7 +537,7 @@ Objectif : transformer les événements bruts en signaux exploitables. - statistiques de comportement, - premiers patterns. -### 6.033. Version `1.x.y` — Wallets et swap préparatoire +### 6.040. Version `1.x.y` — Wallets et swap préparatoire Objectif : préparer la couche d’action. À faire : @@ -477,7 +548,7 @@ Objectif : préparer la couche d’action. - préparation d’ordres et de swaps, - simulation et garde-fous. -### 6.034. Version `2.x.y` — Trading semi-automatisé +### 6.041. Version `2.x.y` — Trading semi-automatisé Objectif : brancher l’analyse à l’action tout en gardant des garde-fous explicites. À faire : @@ -488,7 +559,7 @@ Objectif : brancher l’analyse à l’action tout en gardant des garde-fous exp - confirmations explicites ou semi-automatiques, - journaux d’exécution. -### 6.035. Version `3.x.y` — Yellowstone gRPC +### 6.042. Version `3.x.y` — Yellowstone gRPC Objectif : ajouter le connecteur gRPC dédié. À faire : diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index d9db675..875ab31 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -8,19 +8,22 @@ #![deny(unreachable_pub)] #![warn(missing_docs)] -mod config; mod constants; mod error; -mod http_client; +mod config; mod tracing; mod types; mod ws_client; mod json_rpc_ws; mod solana_pubsub_ws; +mod ws_manager; +mod http_client; mod http_pool; mod db; mod detect; +pub use crate::constants::*; +pub use crate::error::KbError; pub use crate::config::KbAppConfig; pub use crate::config::KbConfig; pub use crate::config::KbDataConfig; @@ -30,18 +33,6 @@ pub use crate::config::KbSolanaConfig; pub use crate::config::KbWsEndpointConfig; pub use crate::config::KbDatabaseConfig; pub use crate::config::KbSqliteDatabaseConfig; -pub use crate::constants::*; -pub use crate::error::KbError; -pub use crate::http_client::HttpClient; -pub use crate::http_client::KbJsonRpcHttpErrorObject; -pub use crate::http_client::KbJsonRpcHttpErrorResponse; -pub use crate::http_client::KbJsonRpcHttpRequest; -pub use crate::http_client::KbJsonRpcHttpResponse; -pub use crate::http_client::KbJsonRpcHttpSuccessResponse; -pub use crate::http_client::KbHttpEndpointStatus; -pub use crate::http_client::KbHttpMethodClass; -pub use crate::http_client::parse_kb_json_rpc_http_response_text; -pub use crate::http_client::parse_kb_json_rpc_http_response_value; pub use crate::tracing::KbTracingGuard; pub use crate::tracing::init_tracing; pub use crate::types::KbConnectionState; @@ -62,6 +53,19 @@ pub use crate::json_rpc_ws::parse_kb_json_rpc_ws_incoming_value; pub use crate::solana_pubsub_ws::KbSolanaWsTypedNotification; pub use crate::solana_pubsub_ws::parse_kb_solana_ws_typed_notification; pub use crate::solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event; +pub use crate::ws_manager::WsManagedEndpointSnapshot; +pub use crate::ws_manager::WsManager; +pub use crate::ws_manager::WsManagerSnapshot; +pub use crate::http_client::HttpClient; +pub use crate::http_client::KbJsonRpcHttpErrorObject; +pub use crate::http_client::KbJsonRpcHttpErrorResponse; +pub use crate::http_client::KbJsonRpcHttpRequest; +pub use crate::http_client::KbJsonRpcHttpResponse; +pub use crate::http_client::KbJsonRpcHttpSuccessResponse; +pub use crate::http_client::KbHttpEndpointStatus; +pub use crate::http_client::KbHttpMethodClass; +pub use crate::http_client::parse_kb_json_rpc_http_response_text; +pub use crate::http_client::parse_kb_json_rpc_http_response_value; pub use crate::http_pool::HttpEndpointPool; pub use crate::http_pool::KbHttpPoolClientSnapshot; pub use crate::db::KbDatabase; diff --git a/kb_lib/src/ws_manager.rs b/kb_lib/src/ws_manager.rs new file mode 100644 index 0000000..74db560 --- /dev/null +++ b/kb_lib/src/ws_manager.rs @@ -0,0 +1,956 @@ +// file: kb_lib/src/ws_manager.rs + +//! Multi-client WebSocket orchestration. +//! +//! This module provides a thin orchestration layer above [`crate::WsClient`]. +//! It builds and manages multiple WebSocket clients, republishes a unified +//! event stream, and can attach one shared detection relay to all managed +//! clients. + +/// Snapshot of one managed endpoint. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct WsManagedEndpointSnapshot { + /// Endpoint logical name. + pub endpoint_name: std::string::String, + /// Endpoint resolved URL. + pub resolved_url: std::string::String, + /// Endpoint provider name. + pub provider: std::string::String, + /// Current connection state. + pub state: crate::KbConnectionState, + /// Number of active subscriptions currently tracked by the client. + pub active_subscription_count: usize, +} + +/// Snapshot of the whole manager state. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct WsManagerSnapshot { + /// Number of managed endpoints. + pub endpoint_count: usize, + /// Number of endpoints whose state is not `Disconnected`. + pub started_count: usize, + /// Per-endpoint snapshot list. + pub endpoints: std::vec::Vec, +} + +#[derive(Debug)] +struct WsManagedClient { + client: crate::WsClient, + event_forward_abort_handle: tokio::task::AbortHandle, +} + +/// Multi-client WebSocket orchestrator. +#[derive(Debug)] +pub struct WsManager { + clients: tokio::sync::Mutex>, + event_tx: tokio::sync::broadcast::Sender, + detection_relay_sender: tokio::sync::Mutex< + std::option::Option>, + >, + detection_relay_abort_handle: tokio::sync::Mutex>, +} + +impl WsManager { + /// Builds one manager from a slice of WebSocket endpoint configurations. + /// + /// Only enabled endpoints are retained. + pub fn from_ws_endpoints( + endpoints: &[crate::KbWsEndpointConfig], + ) -> Result { + let mut selected_endpoints = std::vec::Vec::new(); + let mut max_event_capacity = 1_usize; + for endpoint in endpoints { + if endpoint.enabled { + selected_endpoints.push(endpoint.clone()); + if endpoint.event_channel_capacity > max_event_capacity { + max_event_capacity = endpoint.event_channel_capacity; + } + } + } + let channel_result = tokio::sync::broadcast::channel(max_event_capacity); + let (event_tx, _) = channel_result; + let mut clients = std::collections::BTreeMap::new(); + for endpoint in selected_endpoints { + if clients.contains_key(endpoint.name.as_str()) { + return Err(crate::KbError::Config(format!( + "duplicate websocket endpoint name '{}'", + endpoint.name + ))); + } + let client_result = crate::WsClient::new(endpoint.clone()); + let client = match client_result { + Ok(client) => client, + Err(error) => return Err(error), + }; + let event_forward_abort_handle = + spawn_event_forward_task(client.clone(), event_tx.clone()); + clients.insert( + endpoint.name.clone(), + WsManagedClient { + client, + event_forward_abort_handle, + }, + ); + } + Ok(Self { + clients: tokio::sync::Mutex::new(clients), + event_tx, + detection_relay_sender: tokio::sync::Mutex::new(None), + detection_relay_abort_handle: tokio::sync::Mutex::new(None), + }) + } + + /// Builds one manager from the application configuration. + pub fn from_config(config: &crate::KbConfig) -> Result { + Self::from_ws_endpoints(&config.solana.ws_endpoints) + } + + /// Returns a unified broadcast receiver for all managed client events. + pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver { + self.event_tx.subscribe() + } + + /// Returns the list of managed endpoint names. + pub async fn endpoint_names(&self) -> std::vec::Vec { + let clients_guard = self.clients.lock().await; + clients_guard.keys().cloned().collect() + } + + /// Returns the list of managed endpoint names having the requested role. + pub async fn endpoint_names_for_role(&self, role: &str) -> std::vec::Vec { + let clients_guard = self.clients.lock().await; + let mut endpoint_names = std::vec::Vec::new(); + for (endpoint_name, managed) in &*clients_guard { + if managed + .client + .endpoint_config() + .roles + .iter() + .any(|configured_role| configured_role == role) + { + endpoint_names.push(endpoint_name.clone()); + } + } + endpoint_names + } + + /// Starts all managed endpoints having the requested role. + pub async fn start_role(&self, role: &str) -> Result { + let endpoint_names = self.endpoint_names_for_role(role).await; + let mut started_count = 0_usize; + for endpoint_name in endpoint_names { + let start_result = self.start_endpoint(endpoint_name.as_str()).await; + if let Err(error) = start_result { + return Err(error); + } + started_count += 1; + } + Ok(started_count) + } + + /// Stops all managed endpoints having the requested role. + pub async fn stop_role(&self, role: &str) -> Result { + let endpoint_names = self.endpoint_names_for_role(role).await; + let mut stopped_count = 0_usize; + for endpoint_name in endpoint_names { + let stop_result = self.stop_endpoint(endpoint_name.as_str()).await; + if let Err(error) = stop_result { + return Err(error); + } + stopped_count += 1; + } + Ok(stopped_count) + } + + /// Returns one managed client by endpoint name. + pub async fn client(&self, endpoint_name: &str) -> std::option::Option { + let clients_guard = self.clients.lock().await; + let managed_option = clients_guard.get(endpoint_name); + match managed_option { + Some(managed) => Some(managed.client.clone()), + None => None, + } + } + + /// Starts one managed endpoint. + pub async fn start_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> { + let client_option = self.client(endpoint_name).await; + let client = match client_option { + Some(client) => client, + None => { + return Err(crate::KbError::InvalidState(format!( + "unknown managed websocket endpoint '{}'", + endpoint_name + ))); + } + }; + let sender_option = { + let sender_guard = self.detection_relay_sender.lock().await; + sender_guard.clone() + }; + if let Some(sender) = sender_option { + client.set_detection_notification_forwarder(sender).await; + } + let connect_result = client.connect().await; + if let Err(error) = connect_result { + return Err(error); + } + Ok(()) + } + + /// Stops one managed endpoint. + pub async fn stop_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> { + let client_option = self.client(endpoint_name).await; + let client = match client_option { + Some(client) => client, + None => { + return Err(crate::KbError::InvalidState(format!( + "unknown managed websocket endpoint '{}'", + endpoint_name + ))); + } + }; + client.clear_detection_notification_forwarder().await; + let disconnect_result = client.disconnect().await; + if let Err(error) = disconnect_result { + return Err(error); + } + Ok(()) + } + + /// Starts all managed endpoints. + pub async fn start_all(&self) -> Result { + let endpoint_names = self.endpoint_names().await; + let mut started_count = 0_usize; + for endpoint_name in endpoint_names { + let start_result = self.start_endpoint(endpoint_name.as_str()).await; + if let Err(error) = start_result { + return Err(error); + } + started_count += 1; + } + Ok(started_count) + } + + /// Stops all managed endpoints. + pub async fn stop_all(&self) -> Result { + let endpoint_names = self.endpoint_names().await; + let mut stopped_count = 0_usize; + for endpoint_name in endpoint_names { + let stop_result = self.stop_endpoint(endpoint_name.as_str()).await; + if let Err(error) = stop_result { + return Err(error); + } + stopped_count += 1; + } + Ok(stopped_count) + } + + /// Returns the number of active subscriptions for one endpoint. + pub async fn active_subscription_count( + &self, + endpoint_name: &str, + ) -> Result { + let client_option = self.client(endpoint_name).await; + let client = match client_option { + Some(client) => client, + None => { + return Err(crate::KbError::InvalidState(format!( + "unknown managed websocket endpoint '{}'", + endpoint_name + ))); + } + }; + Ok(client.active_subscription_count().await) + } + + /// Returns a consolidated snapshot of all managed endpoints. + pub async fn snapshot(&self) -> Result { + let clients_to_snapshot = { + let clients_guard = self.clients.lock().await; + let mut values = std::vec::Vec::new(); + for (endpoint_name, managed) in &*clients_guard { + values.push((endpoint_name.clone(), managed.client.clone())); + } + values + }; + let mut endpoints = std::vec::Vec::new(); + let mut started_count = 0_usize; + for (endpoint_name, client) in clients_to_snapshot { + let state = client.connection_state().await; + let active_subscription_count = client.active_subscription_count().await; + + if state != crate::KbConnectionState::Disconnected { + started_count += 1; + } + endpoints.push(WsManagedEndpointSnapshot { + endpoint_name, + resolved_url: client.endpoint_url().to_string(), + provider: client.endpoint_config().provider.clone(), + state, + active_subscription_count, + }); + } + Ok(WsManagerSnapshot { + endpoint_count: endpoints.len(), + started_count, + endpoints, + }) + } + + /// Attaches one shared detection relay to all managed clients. + pub async fn attach_detection_relay( + &self, + database: std::sync::Arc, + queue_capacity: usize, + ) -> Result<(), crate::KbError> { + { + let sender_guard = self.detection_relay_sender.lock().await; + if sender_guard.is_some() { + return Err(crate::KbError::InvalidState( + "websocket detection relay is already attached".to_string(), + )); + } + } + let persistence = crate::KbDetectionPersistenceService::new(database); + let detector = crate::KbSolanaWsDetectionService::new(persistence); + let relay = crate::KbWsDetectionRelay::new(detector); + let (sender, receiver) = crate::KbWsDetectionRelay::channel(queue_capacity); + let relay_task = relay.spawn(receiver); + let relay_abort_handle = relay_task.abort_handle(); + { + let mut sender_guard = self.detection_relay_sender.lock().await; + *sender_guard = Some(sender.clone()); + } + { + let mut abort_guard = self.detection_relay_abort_handle.lock().await; + *abort_guard = Some(relay_abort_handle); + } + let clients = { + let clients_guard = self.clients.lock().await; + let mut values = std::vec::Vec::new(); + for managed in clients_guard.values() { + values.push(managed.client.clone()); + } + + values + }; + for client in clients { + client + .set_detection_notification_forwarder(sender.clone()) + .await; + } + Ok(()) + } + + /// Detaches the shared detection relay from all managed clients. + pub async fn detach_detection_relay(&self) -> Result<(), crate::KbError> { + let clients = { + let clients_guard = self.clients.lock().await; + let mut values = std::vec::Vec::new(); + for managed in clients_guard.values() { + values.push(managed.client.clone()); + } + values + }; + for client in clients { + client.clear_detection_notification_forwarder().await; + } + { + let mut sender_guard = self.detection_relay_sender.lock().await; + *sender_guard = None; + } + let abort_handle_option = { + let mut abort_guard = self.detection_relay_abort_handle.lock().await; + abort_guard.take() + }; + if let Some(abort_handle) = abort_handle_option { + abort_handle.abort(); + } + Ok(()) + } + + /// Returns whether one managed endpoint exists. + pub async fn has_endpoint(&self, endpoint_name: &str) -> bool { + let clients_guard = self.clients.lock().await; + clients_guard.contains_key(endpoint_name) + } + + /// Returns the current connection state of one endpoint. + pub async fn endpoint_state( + &self, + endpoint_name: &str, + ) -> Result { + let client_option = self.client(endpoint_name).await; + let client = match client_option { + Some(client) => client, + None => { + return Err(crate::KbError::InvalidState(format!( + "unknown managed websocket endpoint '{}'", + endpoint_name + ))); + } + }; + Ok(client.connection_state().await) + } +} + +impl Drop for WsManager { + fn drop(&mut self) { + let clients_result = self.clients.try_lock(); + if let Ok(mut clients_guard) = clients_result { + for managed in clients_guard.values_mut() { + managed.event_forward_abort_handle.abort(); + } + } + let relay_abort_result = self.detection_relay_abort_handle.try_lock(); + if let Ok(mut abort_guard) = relay_abort_result { + if let Some(abort_handle) = abort_guard.take() { + abort_handle.abort(); + } + } + } +} + +fn spawn_event_forward_task( + client: crate::WsClient, + event_tx: tokio::sync::broadcast::Sender, +) -> tokio::task::AbortHandle { + let mut receiver = client.subscribe_events(); + let task = tokio::spawn(async move { + loop { + let recv_result = receiver.recv().await; + match recv_result { + Ok(event) => { + let _ = event_tx.send(event); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + break; + } + } + } + }); + + task.abort_handle() +} + +#[cfg(test)] +mod tests { + #[derive(Debug)] + struct TestWsServer { + url: std::string::String, + shutdown_tx: std::option::Option>, + } + + impl TestWsServer { + async fn spawn_echo_server() -> Self { + let listener_result = tokio::net::TcpListener::bind("127.0.0.1:0").await; + let listener = match listener_result { + Ok(listener) => listener, + Err(error) => panic!("tcp bind failed: {error}"), + }; + let local_addr_result = listener.local_addr(); + let local_addr = match local_addr_result { + Ok(local_addr) => local_addr, + Err(error) => panic!("local_addr failed: {error}"), + }; + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut shutdown_rx => { + break; + } + accept_result = listener.accept() => { + let (stream, _) = match accept_result { + Ok(pair) => pair, + Err(_) => break, + }; + tokio::spawn(async move { + let accept_result = tokio_tungstenite::accept_async(stream).await; + let mut websocket = match accept_result { + Ok(websocket) => websocket, + Err(_) => return, + }; + loop { + let next_result = futures_util::StreamExt::next(&mut websocket).await; + let message_result = match next_result { + Some(message_result) => message_result, + None => break, + }; + let message = match message_result { + Ok(message) => message, + Err(_) => break, + }; + match message { + tokio_tungstenite::tungstenite::Message::Text(text) => { + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Text(text), + ).await; + if send_result.is_err() { + break; + } + } + tokio_tungstenite::tungstenite::Message::Ping(data) => { + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Pong(data), + ).await; + if send_result.is_err() { + break; + } + } + tokio_tungstenite::tungstenite::Message::Close(frame) => { + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Close(frame), + ).await; + let _ = send_result; + break; + } + tokio_tungstenite::tungstenite::Message::Binary(_) => {} + tokio_tungstenite::tungstenite::Message::Pong(_) => {} + tokio_tungstenite::tungstenite::Message::Frame(_) => {} + } + } + }); + } + } + } + }); + Self { + url: format!("ws://{}", local_addr), + shutdown_tx: Some(shutdown_tx), + } + } + + async fn spawn_json_rpc_server() -> Self { + let listener_result = tokio::net::TcpListener::bind("127.0.0.1:0").await; + let listener = match listener_result { + Ok(listener) => listener, + Err(error) => panic!("tcp bind failed: {error}"), + }; + let local_addr_result = listener.local_addr(); + let local_addr = match local_addr_result { + Ok(local_addr) => local_addr, + Err(error) => panic!("local_addr failed: {error}"), + }; + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut shutdown_rx => { + break; + } + accept_result = listener.accept() => { + let (stream, _) = match accept_result { + Ok(pair) => pair, + Err(_) => break, + }; + tokio::spawn(async move { + let accept_result = tokio_tungstenite::accept_async(stream).await; + let mut websocket = match accept_result { + Ok(websocket) => websocket, + Err(_) => return, + }; + loop { + let next_result = futures_util::StreamExt::next(&mut websocket).await; + let message_result = match next_result { + Some(message_result) => message_result, + None => break, + }; + let message = match message_result { + Ok(message) => message, + Err(_) => break, + }; + match message { + tokio_tungstenite::tungstenite::Message::Text(text) => { + let parse_result: Result = + serde_json::from_str(text.as_str()); + let request_value = match parse_result { + Ok(request_value) => request_value, + Err(_) => continue, + }; + let method_option = request_value + .get("method") + .and_then(serde_json::Value::as_str); + let method = match method_option { + Some(method) => method, + None => continue, + }; + if method == "slotSubscribe" { + let id_value = match request_value.get("id") { + Some(id_value) => id_value.clone(), + None => serde_json::Value::from(1_u64), + }; + let success_text = serde_json::json!({ + "jsonrpc": "2.0", + "result": 77_u64, + "id": id_value, + }).to_string(); + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Text(success_text.into()), + ).await; + if send_result.is_err() { + break; + } + let notification_text = serde_json::json!({ + "jsonrpc": "2.0", + "method": "slotNotification", + "params": { + "result": { + "slot": 12_u64, + "parent": 11_u64, + "root": 10_u64 + }, + "subscription": 77_u64 + } + }).to_string(); + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Text(notification_text.into()), + ).await; + if send_result.is_err() { + break; + } + } else if method == "slotUnsubscribe" { + let id_value = match request_value.get("id") { + Some(id_value) => id_value.clone(), + None => serde_json::Value::from(2_u64), + }; + let success_text = serde_json::json!({ + "jsonrpc": "2.0", + "result": true, + "id": id_value, + }).to_string(); + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Text(success_text.into()), + ).await; + if send_result.is_err() { + break; + } + } + } + tokio_tungstenite::tungstenite::Message::Ping(data) => { + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Pong(data), + ).await; + if send_result.is_err() { + break; + } + } + tokio_tungstenite::tungstenite::Message::Close(frame) => { + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Close(frame), + ).await; + let _ = send_result; + break; + } + tokio_tungstenite::tungstenite::Message::Binary(_) => {} + tokio_tungstenite::tungstenite::Message::Pong(_) => {} + tokio_tungstenite::tungstenite::Message::Frame(_) => {} + } + } + }); + } + } + } + }); + Self { + url: format!("ws://{}", local_addr), + shutdown_tx: Some(shutdown_tx), + } + } + + async fn shutdown(mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } + } + } + + fn make_ws_endpoint( + name: &str, + url: std::string::String, + enabled: bool, + ) -> crate::KbWsEndpointConfig { + crate::KbWsEndpointConfig { + name: name.to_string(), + enabled, + provider: "test".to_string(), + url, + api_key_env_var: None, + roles: vec!["test".to_string()], + max_subscriptions: 16, + connect_timeout_ms: 2000, + request_timeout_ms: 2000, + unsubscribe_timeout_ms: 1000, + write_channel_capacity: 32, + event_channel_capacity: 64, + auto_reconnect: false, + } + } + + async fn recv_manager_event( + receiver: &mut tokio::sync::broadcast::Receiver, + ) -> crate::WsEvent { + let timeout_result = + tokio::time::timeout(std::time::Duration::from_secs(2), receiver.recv()).await; + let recv_result = match timeout_result { + Ok(recv_result) => recv_result, + Err(_) => panic!("manager event receive timeout"), + }; + match recv_result { + Ok(event) => event, + Err(error) => panic!("manager event receive failed: {error}"), + } + } + + async fn create_database() -> crate::KbDatabase { + let tempdir_result = tempfile::tempdir(); + let tempdir = match tempdir_result { + Ok(tempdir) => tempdir, + Err(error) => panic!("tempdir failed: {error}"), + }; + let database_path = tempdir.path().join("ws_manager.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database_result = crate::KbDatabase::connect_and_initialize(&config).await; + match database_result { + Ok(database) => database, + Err(error) => panic!("database init failed: {error}"), + } + } + + #[tokio::test] + async fn from_ws_endpoints_builds_only_enabled_clients() { + let endpoints = vec![ + make_ws_endpoint("ws_a", "ws://127.0.0.1:1".to_string(), true), + make_ws_endpoint("ws_b", "ws://127.0.0.1:2".to_string(), false), + make_ws_endpoint("ws_c", "ws://127.0.0.1:3".to_string(), true), + ]; + let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); + let manager = match manager_result { + Ok(manager) => manager, + Err(error) => panic!("from_ws_endpoints failed: {error}"), + }; + let endpoint_names = manager.endpoint_names().await; + assert_eq!(endpoint_names, vec!["ws_a".to_string(), "ws_c".to_string()]); + } + + #[tokio::test] + async fn start_all_connects_all_clients_and_republishes_events() { + let server_a = TestWsServer::spawn_echo_server().await; + let server_b = TestWsServer::spawn_echo_server().await; + let endpoints = vec![ + make_ws_endpoint("ws_a", server_a.url.clone(), true), + make_ws_endpoint("ws_b", server_b.url.clone(), true), + ]; + let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); + let manager = match manager_result { + Ok(manager) => manager, + Err(error) => panic!("from_ws_endpoints failed: {error}"), + }; + let mut receiver = manager.subscribe_events(); + let start_result = manager.start_all().await; + let started_count = match start_result { + Ok(started_count) => started_count, + Err(error) => panic!("start_all failed: {error}"), + }; + assert_eq!(started_count, 2); + let mut connected_names = std::collections::BTreeSet::new(); + for _ in 0..4 { + let event = recv_manager_event(&mut receiver).await; + if let crate::WsEvent::Connected { endpoint_name, .. } = event { + connected_names.insert(endpoint_name); + } + if connected_names.len() == 2 { + break; + } + } + assert!(connected_names.contains("ws_a")); + assert!(connected_names.contains("ws_b")); + let stop_result = manager.stop_all().await; + if let Err(error) = stop_result { + panic!("stop_all failed: {error}"); + } + server_a.shutdown().await; + server_b.shutdown().await; + } + + #[tokio::test] + async fn snapshot_reports_states() { + let server = TestWsServer::spawn_echo_server().await; + let endpoints = vec![make_ws_endpoint("ws_a", server.url.clone(), true)]; + let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); + let manager = match manager_result { + Ok(manager) => manager, + Err(error) => panic!("from_ws_endpoints failed: {error}"), + }; + let start_result = manager.start_all().await; + if let Err(error) = start_result { + panic!("start_all failed: {error}"); + } + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + let snapshot_result = manager.snapshot().await; + let snapshot = match snapshot_result { + Ok(snapshot) => snapshot, + Err(error) => panic!("snapshot failed: {error}"), + }; + assert_eq!(snapshot.endpoint_count, 1); + assert_eq!(snapshot.started_count, 1); + assert_eq!(snapshot.endpoints.len(), 1); + assert_eq!(snapshot.endpoints[0].endpoint_name, "ws_a"); + assert_eq!( + snapshot.endpoints[0].state, + crate::KbConnectionState::Connected + ); + let stop_result = manager.stop_all().await; + if let Err(error) = stop_result { + panic!("stop_all failed: {error}"); + } + server.shutdown().await; + } + + #[tokio::test] + async fn attach_detection_relay_allows_managed_client_forwarding() { + let server = TestWsServer::spawn_json_rpc_server().await; + let endpoints = vec![make_ws_endpoint("ws_a", server.url.clone(), true)]; + let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); + let manager = match manager_result { + Ok(manager) => manager, + Err(error) => panic!("from_ws_endpoints failed: {error}"), + }; + let database = create_database().await; + let database = std::sync::Arc::new(database); + let attach_result = manager.attach_detection_relay(database.clone(), 16).await; + if let Err(error) = attach_result { + panic!("attach_detection_relay failed: {error}"); + } + let start_result = manager.start_all().await; + if let Err(error) = start_result { + panic!("start_all failed: {error}"); + } + let client_option = manager.client("ws_a").await; + let client = match client_option { + Some(client) => client, + None => panic!("client must exist"), + }; + let subscribe_result = client + .send_json_rpc_request("slotSubscribe".to_string(), std::vec::Vec::new()) + .await; + if let Err(error) = subscribe_result { + panic!("slotSubscribe failed: {error}"); + } + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + let observations_result = + crate::list_recent_onchain_observations(database.as_ref(), 10).await; + let observations = match observations_result { + Ok(observations) => observations, + Err(error) => panic!("list_recent_onchain_observations failed: {error}"), + }; + assert_eq!(observations.len(), 1); + assert_eq!(observations[0].observation_kind, "ws.slot_notification"); + let stop_result = manager.stop_all().await; + if let Err(error) = stop_result { + panic!("stop_all failed: {error}"); + } + let detach_result = manager.detach_detection_relay().await; + if let Err(error) = detach_result { + panic!("detach_detection_relay failed: {error}"); + } + server.shutdown().await; + } + + #[tokio::test] + async fn from_config_builds_only_enabled_clients() { + let config = crate::KbConfig { + app: crate::KbAppConfig { + name: "test".to_string(), + environment: "test".to_string(), + auto_reconnect_default: false, + }, + logging: crate::KbLoggingConfig { + level: "debug".to_string(), + console_enabled: true, + console_ansi: true, + file_enabled: false, + directory: "./logs".to_string(), + file_prefix: "app".to_string(), + rotation: "daily".to_string(), + message_format: "compact".to_string(), + time_format: "rfc3339_millis".to_string(), + target_filters: std::collections::BTreeMap::new(), + }, + data: crate::KbDataConfig { + sqlite_path: "data/test.sqlite3".to_string(), + wallets_directory: "wallets".to_string(), + }, + solana: crate::KbSolanaConfig { + http_endpoints: std::vec::Vec::new(), + ws_endpoints: vec![ + make_ws_endpoint("ws_a", "ws://127.0.0.1:1".to_string(), true), + make_ws_endpoint("ws_b", "ws://127.0.0.1:2".to_string(), false), + make_ws_endpoint("ws_c", "ws://127.0.0.1:3".to_string(), true), + ], + }, + database: crate::KbDatabaseConfig { + enabled: false, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: "data/test.sqlite3".to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: false, + use_wal: false, + }, + }, + }; + let manager_result = crate::WsManager::from_config(&config); + let manager = match manager_result { + Ok(manager) => manager, + Err(error) => panic!("from_config failed: {error}"), + }; + let endpoint_names = manager.endpoint_names().await; + assert_eq!(endpoint_names, vec!["ws_a".to_string(), "ws_c".to_string()]); + } + + #[tokio::test] + async fn endpoint_names_for_role_filters_managed_clients() { + let mut ws_a = make_ws_endpoint("ws_a", "ws://127.0.0.1:1".to_string(), true); + ws_a.roles = vec!["slots".to_string(), "general".to_string()]; + let mut ws_b = make_ws_endpoint("ws_b", "ws://127.0.0.1:2".to_string(), true); + ws_b.roles = vec!["programs".to_string()]; + let mut ws_c = make_ws_endpoint("ws_c", "ws://127.0.0.1:3".to_string(), true); + ws_c.roles = vec!["slots".to_string()]; + let endpoints = vec![ws_a, ws_b, ws_c]; + let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); + let manager = match manager_result { + Ok(manager) => manager, + Err(error) => panic!("from_ws_endpoints failed: {error}"), + }; + let slot_endpoints = manager.endpoint_names_for_role("slots").await; + let program_endpoints = manager.endpoint_names_for_role("programs").await; + let unknown_endpoints = manager.endpoint_names_for_role("unknown").await; + assert_eq!(slot_endpoints, vec!["ws_a".to_string(), "ws_c".to_string()]); + assert_eq!(program_endpoints, vec!["ws_b".to_string()]); + assert!(unknown_endpoints.is_empty()); + } +}