This commit is contained in:
2026-04-25 13:52:07 +02:00
parent d4de95e6db
commit f90ca40202
5 changed files with 1063 additions and 31 deletions

View File

@@ -28,3 +28,4 @@
0.6.2 - Branchement de WsClient vers le pipeline de détection via un relais asynchrone de notifications JSON-RPC WebSocket
0.6.3 - Enrichissement des notifications WebSocket utiles : extraction améliorée de pubkey, signature, owner, parsed account type et slot pour account/logs/signature notifications
0.6.4 - Premières règles de détection technique pour candidats pools/listings depuis programNotification en sappuyant sur les DEX connus en base
0.6.5 - Ajout de ws_manager.rs pour lorchestration multi-clients WebSocket, le bus dévénements unifié et le branchement centralisé du relais de détection

View File

@@ -8,7 +8,7 @@ members = [
]
[workspace.package]
version = "0.6.4"
version = "0.6.5"
edition = "2024"
license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot"

View File

@@ -427,17 +427,87 @@ Réalisé :
- maintien dune logique encore indépendante des connecteurs DEX `0.7.x`.
### 6.030. Version `0.6.5` — Orchestration multi-clients WebSocket
Objectif : préparer la gestion coordonnée de plusieurs `WsClient`.
Réalisé :
- introduction dune abstraction `ws_manager.rs` pour piloter plusieurs `WsClient`,
- construction des clients WS activés depuis la configuration dendpoints,
- démarrage et arrêt centralisés par endpoint ou globalement,
- republication dun flux unifié de `WsEvent` pour lensemble des clients gérés,
- branchement optionnel du relais de détection WS sur tous les clients orchestrés,
- préparation des futures politiques de répartition, supervision et reconnexion.
### 6.031. Version `0.7.0` — Résolution transactionnelle orientée DEX
Objectif : relier la détection temps réel aux transactions Solana complètes.
À faire :
- introduire une abstraction `ws_pool.rs` ou `ws_manager.rs`,
- piloter plusieurs clients WS selon les rôles dendpoints configurés,
- centraliser le démarrage, larrêt, létat et les relais de notifications WS,
- préparer les futures politiques de répartition, supervision et reconnexion.
- introduire une file de résolution transactionnelle alimentée par les signatures issues des flux WS,
- corréler `logsNotification`, `programNotification` et `signatureNotification` avec des appels `getTransaction`,
- utiliser le pool HTTP existant pour enrichir les signaux détectés côté WS,
- éviter quune notification intéressante reste au niveau dun simple signal technique sans résolution métier.
### 6.031. Version `0.7.x` — DEX connectors v1
Objectif : structurer les connecteurs par protocole.
### 6.032. Version `0.7.1` — Modèle transactionnel Solana enrichi
Objectif : préparer un modèle interne plus riche, inspiré dune vision `slot -> signature -> instruction`.
À faire :
- préparer les structures et tables permettant de relier blocs/slots, signatures et instructions,
- distinguer clairement transaction, instruction principale et éventuelles inner instructions,
- conserver la possibilité de relier plus tard un pool, un token ou un wallet à une signature fondatrice,
- préparer lhistorique transactionnel nécessaire aux futurs décodeurs DEX.
### 6.033. Version `0.7.2` — Décodeurs DEX spécifiques par programme et version
Objectif : remplacer les heuristiques ponctuelles par de vrais décodeurs Rust dédiés.
À faire :
- introduire des règles spécifiques à chaque DEX / version de programme,
- détecter les instructions utiles à la création de pools, paires et évènements de liquidité,
- encapsuler les index de comptes et les motifs de logs propres à chaque protocole,
- prévoir des décodeurs séparés au minimum pour Raydium, Pump.fun / PumpSwap, Meteora, puis les autres cibles.
### 6.034. Version `0.7.3` — Détection des nouveaux pools et paires via logs + transaction
Objectif : détecter rapidement les nouvelles paires/pools à partir des flux RPC et des transactions enrichies.
À faire :
- relier `logsSubscribe` + `signature` + `getTransaction`,
- détecter les créations de pools via motifs et instructions spécifiques par DEX,
- extraire token A, token B, LP mint, vaults et comptes utiles quand cela est possible,
- alimenter `kb_pools`, `kb_pairs`, `kb_pool_tokens` et `kb_pool_listings` avec des données plus fiables que la seule détection de comptes.
### 6.035. Version `0.7.4` — Modèle métier DEX enrichi
Objectif : faire converger la détection technique et le modèle métier vers une vision proche de lactivité réelle du marché.
À faire :
- enrichir le modèle `pool / pair / pool_token / listing` avec les informations utiles à la lecture DEX,
- préparer le rattachement dune paire à son pool de création et à sa signature de fondation,
- préparer une vision cohérente `token <-> pool <-> pair <-> protocole`,
- distinguer les objets de référence des événements dactivité.
### 6.036. Version `0.7.5` — Wallets, holdings et participants observés
Objectif : préparer le suivi des acteurs on-chain autour des pools et tokens détectés.
À faire :
- préparer le rattachement des signatures, instructions et événements à des wallets observés,
- introduire la notion de holdings utiles au suivi des tokens,
- préparer lidentification des créateurs, mint authorities, wallets dactivité et contreparties,
- éviter de limiter lanalyse future au seul niveau token/pool sans vision des participants.
### 6.037. Version `0.7.6` — Séries de prix, volumes et agrégats DEX
Objectif : préparer la couche analytique fine à partir des événements métier normalisés.
À faire :
- préparer des agrégations de prix/volume par paire,
- introduire la base des futures candles et séries temporelles,
- permettre plus tard le calcul dOHLCV, volume, nombre de trades et liquidité par fenêtre,
- préparer le terrain pour la couche analytique `0.8.x`.
### 6.038. Version `0.7.x` — DEX connectors v1
Objectif : structurer les connecteurs DEX autour dun pipeline complet de résolution, décodage et normalisation métier.
Cibles initiales possibles :
@@ -448,14 +518,15 @@ Cibles initiales possibles :
- FluxBeam
- DexLab
À faire :
Résultat attendu :
- identification des programmes,
- décodage des événements utiles,
- création de types métiers propres,
- enrichissement des métadonnées token/pool/pair.
- identification fiable des programmes et versions,
- résolution des signatures pertinentes,
- décodage des transactions utiles,
- création dobjets métier riches pour tokens, pools, paires, wallets, holdings et séries de prix,
- remplacement progressif des scripts heuristiques externes par des composants Rust intégrés.
### 6.032. Version `0.8.x` — Analyse et filtrage
### 6.039. Version `0.8.x` — Analyse et filtrage
Objectif : transformer les événements bruts en signaux exploitables.
À faire :
@@ -466,7 +537,7 @@ Objectif : transformer les événements bruts en signaux exploitables.
- statistiques de comportement,
- premiers patterns.
### 6.033. Version `1.x.y` — Wallets et swap préparatoire
### 6.040. Version `1.x.y` — Wallets et swap préparatoire
Objectif : préparer la couche daction.
À faire :
@@ -477,7 +548,7 @@ Objectif : préparer la couche daction.
- préparation dordres et de swaps,
- simulation et garde-fous.
### 6.034. Version `2.x.y` — Trading semi-automatisé
### 6.041. Version `2.x.y` — Trading semi-automatisé
Objectif : brancher lanalyse à laction tout en gardant des garde-fous explicites.
À faire :
@@ -488,7 +559,7 @@ Objectif : brancher lanalyse à laction tout en gardant des garde-fous exp
- confirmations explicites ou semi-automatiques,
- journaux dexécution.
### 6.035. Version `3.x.y` — Yellowstone gRPC
### 6.042. Version `3.x.y` — Yellowstone gRPC
Objectif : ajouter le connecteur gRPC dédié.
À faire :

View File

@@ -8,19 +8,22 @@
#![deny(unreachable_pub)]
#![warn(missing_docs)]
mod config;
mod constants;
mod error;
mod http_client;
mod config;
mod tracing;
mod types;
mod ws_client;
mod json_rpc_ws;
mod solana_pubsub_ws;
mod ws_manager;
mod http_client;
mod http_pool;
mod db;
mod detect;
pub use crate::constants::*;
pub use crate::error::KbError;
pub use crate::config::KbAppConfig;
pub use crate::config::KbConfig;
pub use crate::config::KbDataConfig;
@@ -30,18 +33,6 @@ pub use crate::config::KbSolanaConfig;
pub use crate::config::KbWsEndpointConfig;
pub use crate::config::KbDatabaseConfig;
pub use crate::config::KbSqliteDatabaseConfig;
pub use crate::constants::*;
pub use crate::error::KbError;
pub use crate::http_client::HttpClient;
pub use crate::http_client::KbJsonRpcHttpErrorObject;
pub use crate::http_client::KbJsonRpcHttpErrorResponse;
pub use crate::http_client::KbJsonRpcHttpRequest;
pub use crate::http_client::KbJsonRpcHttpResponse;
pub use crate::http_client::KbJsonRpcHttpSuccessResponse;
pub use crate::http_client::KbHttpEndpointStatus;
pub use crate::http_client::KbHttpMethodClass;
pub use crate::http_client::parse_kb_json_rpc_http_response_text;
pub use crate::http_client::parse_kb_json_rpc_http_response_value;
pub use crate::tracing::KbTracingGuard;
pub use crate::tracing::init_tracing;
pub use crate::types::KbConnectionState;
@@ -62,6 +53,19 @@ pub use crate::json_rpc_ws::parse_kb_json_rpc_ws_incoming_value;
pub use crate::solana_pubsub_ws::KbSolanaWsTypedNotification;
pub use crate::solana_pubsub_ws::parse_kb_solana_ws_typed_notification;
pub use crate::solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event;
pub use crate::ws_manager::WsManagedEndpointSnapshot;
pub use crate::ws_manager::WsManager;
pub use crate::ws_manager::WsManagerSnapshot;
pub use crate::http_client::HttpClient;
pub use crate::http_client::KbJsonRpcHttpErrorObject;
pub use crate::http_client::KbJsonRpcHttpErrorResponse;
pub use crate::http_client::KbJsonRpcHttpRequest;
pub use crate::http_client::KbJsonRpcHttpResponse;
pub use crate::http_client::KbJsonRpcHttpSuccessResponse;
pub use crate::http_client::KbHttpEndpointStatus;
pub use crate::http_client::KbHttpMethodClass;
pub use crate::http_client::parse_kb_json_rpc_http_response_text;
pub use crate::http_client::parse_kb_json_rpc_http_response_value;
pub use crate::http_pool::HttpEndpointPool;
pub use crate::http_pool::KbHttpPoolClientSnapshot;
pub use crate::db::KbDatabase;

956
kb_lib/src/ws_manager.rs Normal file
View File

@@ -0,0 +1,956 @@
// file: kb_lib/src/ws_manager.rs
//! Multi-client WebSocket orchestration.
//!
//! This module provides a thin orchestration layer above [`crate::WsClient`].
//! It builds and manages multiple WebSocket clients, republishes a unified
//! event stream, and can attach one shared detection relay to all managed
//! clients.
/// Snapshot of one managed endpoint.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct WsManagedEndpointSnapshot {
/// Endpoint logical name.
pub endpoint_name: std::string::String,
/// Endpoint resolved URL.
pub resolved_url: std::string::String,
/// Endpoint provider name.
pub provider: std::string::String,
/// Current connection state.
pub state: crate::KbConnectionState,
/// Number of active subscriptions currently tracked by the client.
pub active_subscription_count: usize,
}
/// Snapshot of the whole manager state.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct WsManagerSnapshot {
/// Number of managed endpoints.
pub endpoint_count: usize,
/// Number of endpoints whose state is not `Disconnected`.
pub started_count: usize,
/// Per-endpoint snapshot list.
pub endpoints: std::vec::Vec<WsManagedEndpointSnapshot>,
}
#[derive(Debug)]
struct WsManagedClient {
client: crate::WsClient,
event_forward_abort_handle: tokio::task::AbortHandle,
}
/// Multi-client WebSocket orchestrator.
#[derive(Debug)]
pub struct WsManager {
clients: tokio::sync::Mutex<std::collections::BTreeMap<std::string::String, WsManagedClient>>,
event_tx: tokio::sync::broadcast::Sender<crate::WsEvent>,
detection_relay_sender: tokio::sync::Mutex<
std::option::Option<tokio::sync::mpsc::Sender<crate::KbWsDetectionNotificationEnvelope>>,
>,
detection_relay_abort_handle: tokio::sync::Mutex<std::option::Option<tokio::task::AbortHandle>>,
}
impl WsManager {
/// Builds one manager from a slice of WebSocket endpoint configurations.
///
/// Only enabled endpoints are retained.
pub fn from_ws_endpoints(
endpoints: &[crate::KbWsEndpointConfig],
) -> Result<Self, crate::KbError> {
let mut selected_endpoints = std::vec::Vec::new();
let mut max_event_capacity = 1_usize;
for endpoint in endpoints {
if endpoint.enabled {
selected_endpoints.push(endpoint.clone());
if endpoint.event_channel_capacity > max_event_capacity {
max_event_capacity = endpoint.event_channel_capacity;
}
}
}
let channel_result = tokio::sync::broadcast::channel(max_event_capacity);
let (event_tx, _) = channel_result;
let mut clients = std::collections::BTreeMap::new();
for endpoint in selected_endpoints {
if clients.contains_key(endpoint.name.as_str()) {
return Err(crate::KbError::Config(format!(
"duplicate websocket endpoint name '{}'",
endpoint.name
)));
}
let client_result = crate::WsClient::new(endpoint.clone());
let client = match client_result {
Ok(client) => client,
Err(error) => return Err(error),
};
let event_forward_abort_handle =
spawn_event_forward_task(client.clone(), event_tx.clone());
clients.insert(
endpoint.name.clone(),
WsManagedClient {
client,
event_forward_abort_handle,
},
);
}
Ok(Self {
clients: tokio::sync::Mutex::new(clients),
event_tx,
detection_relay_sender: tokio::sync::Mutex::new(None),
detection_relay_abort_handle: tokio::sync::Mutex::new(None),
})
}
/// Builds one manager from the application configuration.
pub fn from_config(config: &crate::KbConfig) -> Result<Self, crate::KbError> {
Self::from_ws_endpoints(&config.solana.ws_endpoints)
}
/// Returns a unified broadcast receiver for all managed client events.
pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<crate::WsEvent> {
self.event_tx.subscribe()
}
/// Returns the list of managed endpoint names.
pub async fn endpoint_names(&self) -> std::vec::Vec<std::string::String> {
let clients_guard = self.clients.lock().await;
clients_guard.keys().cloned().collect()
}
/// Returns the list of managed endpoint names having the requested role.
pub async fn endpoint_names_for_role(&self, role: &str) -> std::vec::Vec<std::string::String> {
let clients_guard = self.clients.lock().await;
let mut endpoint_names = std::vec::Vec::new();
for (endpoint_name, managed) in &*clients_guard {
if managed
.client
.endpoint_config()
.roles
.iter()
.any(|configured_role| configured_role == role)
{
endpoint_names.push(endpoint_name.clone());
}
}
endpoint_names
}
/// Starts all managed endpoints having the requested role.
pub async fn start_role(&self, role: &str) -> Result<usize, crate::KbError> {
let endpoint_names = self.endpoint_names_for_role(role).await;
let mut started_count = 0_usize;
for endpoint_name in endpoint_names {
let start_result = self.start_endpoint(endpoint_name.as_str()).await;
if let Err(error) = start_result {
return Err(error);
}
started_count += 1;
}
Ok(started_count)
}
/// Stops all managed endpoints having the requested role.
pub async fn stop_role(&self, role: &str) -> Result<usize, crate::KbError> {
let endpoint_names = self.endpoint_names_for_role(role).await;
let mut stopped_count = 0_usize;
for endpoint_name in endpoint_names {
let stop_result = self.stop_endpoint(endpoint_name.as_str()).await;
if let Err(error) = stop_result {
return Err(error);
}
stopped_count += 1;
}
Ok(stopped_count)
}
/// Returns one managed client by endpoint name.
pub async fn client(&self, endpoint_name: &str) -> std::option::Option<crate::WsClient> {
let clients_guard = self.clients.lock().await;
let managed_option = clients_guard.get(endpoint_name);
match managed_option {
Some(managed) => Some(managed.client.clone()),
None => None,
}
}
/// Starts one managed endpoint.
pub async fn start_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> {
let client_option = self.client(endpoint_name).await;
let client = match client_option {
Some(client) => client,
None => {
return Err(crate::KbError::InvalidState(format!(
"unknown managed websocket endpoint '{}'",
endpoint_name
)));
}
};
let sender_option = {
let sender_guard = self.detection_relay_sender.lock().await;
sender_guard.clone()
};
if let Some(sender) = sender_option {
client.set_detection_notification_forwarder(sender).await;
}
let connect_result = client.connect().await;
if let Err(error) = connect_result {
return Err(error);
}
Ok(())
}
/// Stops one managed endpoint.
pub async fn stop_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> {
let client_option = self.client(endpoint_name).await;
let client = match client_option {
Some(client) => client,
None => {
return Err(crate::KbError::InvalidState(format!(
"unknown managed websocket endpoint '{}'",
endpoint_name
)));
}
};
client.clear_detection_notification_forwarder().await;
let disconnect_result = client.disconnect().await;
if let Err(error) = disconnect_result {
return Err(error);
}
Ok(())
}
/// Starts all managed endpoints.
pub async fn start_all(&self) -> Result<usize, crate::KbError> {
let endpoint_names = self.endpoint_names().await;
let mut started_count = 0_usize;
for endpoint_name in endpoint_names {
let start_result = self.start_endpoint(endpoint_name.as_str()).await;
if let Err(error) = start_result {
return Err(error);
}
started_count += 1;
}
Ok(started_count)
}
/// Stops all managed endpoints.
pub async fn stop_all(&self) -> Result<usize, crate::KbError> {
let endpoint_names = self.endpoint_names().await;
let mut stopped_count = 0_usize;
for endpoint_name in endpoint_names {
let stop_result = self.stop_endpoint(endpoint_name.as_str()).await;
if let Err(error) = stop_result {
return Err(error);
}
stopped_count += 1;
}
Ok(stopped_count)
}
/// Returns the number of active subscriptions for one endpoint.
pub async fn active_subscription_count(
&self,
endpoint_name: &str,
) -> Result<usize, crate::KbError> {
let client_option = self.client(endpoint_name).await;
let client = match client_option {
Some(client) => client,
None => {
return Err(crate::KbError::InvalidState(format!(
"unknown managed websocket endpoint '{}'",
endpoint_name
)));
}
};
Ok(client.active_subscription_count().await)
}
/// Returns a consolidated snapshot of all managed endpoints.
pub async fn snapshot(&self) -> Result<WsManagerSnapshot, crate::KbError> {
let clients_to_snapshot = {
let clients_guard = self.clients.lock().await;
let mut values = std::vec::Vec::new();
for (endpoint_name, managed) in &*clients_guard {
values.push((endpoint_name.clone(), managed.client.clone()));
}
values
};
let mut endpoints = std::vec::Vec::new();
let mut started_count = 0_usize;
for (endpoint_name, client) in clients_to_snapshot {
let state = client.connection_state().await;
let active_subscription_count = client.active_subscription_count().await;
if state != crate::KbConnectionState::Disconnected {
started_count += 1;
}
endpoints.push(WsManagedEndpointSnapshot {
endpoint_name,
resolved_url: client.endpoint_url().to_string(),
provider: client.endpoint_config().provider.clone(),
state,
active_subscription_count,
});
}
Ok(WsManagerSnapshot {
endpoint_count: endpoints.len(),
started_count,
endpoints,
})
}
/// Attaches one shared detection relay to all managed clients.
pub async fn attach_detection_relay(
&self,
database: std::sync::Arc<crate::KbDatabase>,
queue_capacity: usize,
) -> Result<(), crate::KbError> {
{
let sender_guard = self.detection_relay_sender.lock().await;
if sender_guard.is_some() {
return Err(crate::KbError::InvalidState(
"websocket detection relay is already attached".to_string(),
));
}
}
let persistence = crate::KbDetectionPersistenceService::new(database);
let detector = crate::KbSolanaWsDetectionService::new(persistence);
let relay = crate::KbWsDetectionRelay::new(detector);
let (sender, receiver) = crate::KbWsDetectionRelay::channel(queue_capacity);
let relay_task = relay.spawn(receiver);
let relay_abort_handle = relay_task.abort_handle();
{
let mut sender_guard = self.detection_relay_sender.lock().await;
*sender_guard = Some(sender.clone());
}
{
let mut abort_guard = self.detection_relay_abort_handle.lock().await;
*abort_guard = Some(relay_abort_handle);
}
let clients = {
let clients_guard = self.clients.lock().await;
let mut values = std::vec::Vec::new();
for managed in clients_guard.values() {
values.push(managed.client.clone());
}
values
};
for client in clients {
client
.set_detection_notification_forwarder(sender.clone())
.await;
}
Ok(())
}
/// Detaches the shared detection relay from all managed clients.
pub async fn detach_detection_relay(&self) -> Result<(), crate::KbError> {
let clients = {
let clients_guard = self.clients.lock().await;
let mut values = std::vec::Vec::new();
for managed in clients_guard.values() {
values.push(managed.client.clone());
}
values
};
for client in clients {
client.clear_detection_notification_forwarder().await;
}
{
let mut sender_guard = self.detection_relay_sender.lock().await;
*sender_guard = None;
}
let abort_handle_option = {
let mut abort_guard = self.detection_relay_abort_handle.lock().await;
abort_guard.take()
};
if let Some(abort_handle) = abort_handle_option {
abort_handle.abort();
}
Ok(())
}
/// Returns whether one managed endpoint exists.
pub async fn has_endpoint(&self, endpoint_name: &str) -> bool {
let clients_guard = self.clients.lock().await;
clients_guard.contains_key(endpoint_name)
}
/// Returns the current connection state of one endpoint.
pub async fn endpoint_state(
&self,
endpoint_name: &str,
) -> Result<crate::KbConnectionState, crate::KbError> {
let client_option = self.client(endpoint_name).await;
let client = match client_option {
Some(client) => client,
None => {
return Err(crate::KbError::InvalidState(format!(
"unknown managed websocket endpoint '{}'",
endpoint_name
)));
}
};
Ok(client.connection_state().await)
}
}
impl Drop for WsManager {
fn drop(&mut self) {
let clients_result = self.clients.try_lock();
if let Ok(mut clients_guard) = clients_result {
for managed in clients_guard.values_mut() {
managed.event_forward_abort_handle.abort();
}
}
let relay_abort_result = self.detection_relay_abort_handle.try_lock();
if let Ok(mut abort_guard) = relay_abort_result {
if let Some(abort_handle) = abort_guard.take() {
abort_handle.abort();
}
}
}
}
fn spawn_event_forward_task(
client: crate::WsClient,
event_tx: tokio::sync::broadcast::Sender<crate::WsEvent>,
) -> tokio::task::AbortHandle {
let mut receiver = client.subscribe_events();
let task = tokio::spawn(async move {
loop {
let recv_result = receiver.recv().await;
match recv_result {
Ok(event) => {
let _ = event_tx.send(event);
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
});
task.abort_handle()
}
#[cfg(test)]
mod tests {
#[derive(Debug)]
struct TestWsServer {
url: std::string::String,
shutdown_tx: std::option::Option<tokio::sync::oneshot::Sender<()>>,
}
impl TestWsServer {
async fn spawn_echo_server() -> Self {
let listener_result = tokio::net::TcpListener::bind("127.0.0.1:0").await;
let listener = match listener_result {
Ok(listener) => listener,
Err(error) => panic!("tcp bind failed: {error}"),
};
let local_addr_result = listener.local_addr();
let local_addr = match local_addr_result {
Ok(local_addr) => local_addr,
Err(error) => panic!("local_addr failed: {error}"),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
tokio::spawn(async move {
loop {
tokio::select! {
_ = &mut shutdown_rx => {
break;
}
accept_result = listener.accept() => {
let (stream, _) = match accept_result {
Ok(pair) => pair,
Err(_) => break,
};
tokio::spawn(async move {
let accept_result = tokio_tungstenite::accept_async(stream).await;
let mut websocket = match accept_result {
Ok(websocket) => websocket,
Err(_) => return,
};
loop {
let next_result = futures_util::StreamExt::next(&mut websocket).await;
let message_result = match next_result {
Some(message_result) => message_result,
None => break,
};
let message = match message_result {
Ok(message) => message,
Err(_) => break,
};
match message {
tokio_tungstenite::tungstenite::Message::Text(text) => {
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Text(text),
).await;
if send_result.is_err() {
break;
}
}
tokio_tungstenite::tungstenite::Message::Ping(data) => {
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Pong(data),
).await;
if send_result.is_err() {
break;
}
}
tokio_tungstenite::tungstenite::Message::Close(frame) => {
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Close(frame),
).await;
let _ = send_result;
break;
}
tokio_tungstenite::tungstenite::Message::Binary(_) => {}
tokio_tungstenite::tungstenite::Message::Pong(_) => {}
tokio_tungstenite::tungstenite::Message::Frame(_) => {}
}
}
});
}
}
}
});
Self {
url: format!("ws://{}", local_addr),
shutdown_tx: Some(shutdown_tx),
}
}
async fn spawn_json_rpc_server() -> Self {
let listener_result = tokio::net::TcpListener::bind("127.0.0.1:0").await;
let listener = match listener_result {
Ok(listener) => listener,
Err(error) => panic!("tcp bind failed: {error}"),
};
let local_addr_result = listener.local_addr();
let local_addr = match local_addr_result {
Ok(local_addr) => local_addr,
Err(error) => panic!("local_addr failed: {error}"),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
tokio::spawn(async move {
loop {
tokio::select! {
_ = &mut shutdown_rx => {
break;
}
accept_result = listener.accept() => {
let (stream, _) = match accept_result {
Ok(pair) => pair,
Err(_) => break,
};
tokio::spawn(async move {
let accept_result = tokio_tungstenite::accept_async(stream).await;
let mut websocket = match accept_result {
Ok(websocket) => websocket,
Err(_) => return,
};
loop {
let next_result = futures_util::StreamExt::next(&mut websocket).await;
let message_result = match next_result {
Some(message_result) => message_result,
None => break,
};
let message = match message_result {
Ok(message) => message,
Err(_) => break,
};
match message {
tokio_tungstenite::tungstenite::Message::Text(text) => {
let parse_result: Result<serde_json::Value, _> =
serde_json::from_str(text.as_str());
let request_value = match parse_result {
Ok(request_value) => request_value,
Err(_) => continue,
};
let method_option = request_value
.get("method")
.and_then(serde_json::Value::as_str);
let method = match method_option {
Some(method) => method,
None => continue,
};
if method == "slotSubscribe" {
let id_value = match request_value.get("id") {
Some(id_value) => id_value.clone(),
None => serde_json::Value::from(1_u64),
};
let success_text = serde_json::json!({
"jsonrpc": "2.0",
"result": 77_u64,
"id": id_value,
}).to_string();
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Text(success_text.into()),
).await;
if send_result.is_err() {
break;
}
let notification_text = serde_json::json!({
"jsonrpc": "2.0",
"method": "slotNotification",
"params": {
"result": {
"slot": 12_u64,
"parent": 11_u64,
"root": 10_u64
},
"subscription": 77_u64
}
}).to_string();
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Text(notification_text.into()),
).await;
if send_result.is_err() {
break;
}
} else if method == "slotUnsubscribe" {
let id_value = match request_value.get("id") {
Some(id_value) => id_value.clone(),
None => serde_json::Value::from(2_u64),
};
let success_text = serde_json::json!({
"jsonrpc": "2.0",
"result": true,
"id": id_value,
}).to_string();
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Text(success_text.into()),
).await;
if send_result.is_err() {
break;
}
}
}
tokio_tungstenite::tungstenite::Message::Ping(data) => {
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Pong(data),
).await;
if send_result.is_err() {
break;
}
}
tokio_tungstenite::tungstenite::Message::Close(frame) => {
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Close(frame),
).await;
let _ = send_result;
break;
}
tokio_tungstenite::tungstenite::Message::Binary(_) => {}
tokio_tungstenite::tungstenite::Message::Pong(_) => {}
tokio_tungstenite::tungstenite::Message::Frame(_) => {}
}
}
});
}
}
}
});
Self {
url: format!("ws://{}", local_addr),
shutdown_tx: Some(shutdown_tx),
}
}
async fn shutdown(mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
}
}
fn make_ws_endpoint(
name: &str,
url: std::string::String,
enabled: bool,
) -> crate::KbWsEndpointConfig {
crate::KbWsEndpointConfig {
name: name.to_string(),
enabled,
provider: "test".to_string(),
url,
api_key_env_var: None,
roles: vec!["test".to_string()],
max_subscriptions: 16,
connect_timeout_ms: 2000,
request_timeout_ms: 2000,
unsubscribe_timeout_ms: 1000,
write_channel_capacity: 32,
event_channel_capacity: 64,
auto_reconnect: false,
}
}
async fn recv_manager_event(
receiver: &mut tokio::sync::broadcast::Receiver<crate::WsEvent>,
) -> crate::WsEvent {
let timeout_result =
tokio::time::timeout(std::time::Duration::from_secs(2), receiver.recv()).await;
let recv_result = match timeout_result {
Ok(recv_result) => recv_result,
Err(_) => panic!("manager event receive timeout"),
};
match recv_result {
Ok(event) => event,
Err(error) => panic!("manager event receive failed: {error}"),
}
}
async fn create_database() -> crate::KbDatabase {
let tempdir_result = tempfile::tempdir();
let tempdir = match tempdir_result {
Ok(tempdir) => tempdir,
Err(error) => panic!("tempdir failed: {error}"),
};
let database_path = tempdir.path().join("ws_manager.sqlite3");
let config = crate::KbDatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
let database_result = crate::KbDatabase::connect_and_initialize(&config).await;
match database_result {
Ok(database) => database,
Err(error) => panic!("database init failed: {error}"),
}
}
#[tokio::test]
async fn from_ws_endpoints_builds_only_enabled_clients() {
let endpoints = vec![
make_ws_endpoint("ws_a", "ws://127.0.0.1:1".to_string(), true),
make_ws_endpoint("ws_b", "ws://127.0.0.1:2".to_string(), false),
make_ws_endpoint("ws_c", "ws://127.0.0.1:3".to_string(), true),
];
let manager_result = crate::WsManager::from_ws_endpoints(&endpoints);
let manager = match manager_result {
Ok(manager) => manager,
Err(error) => panic!("from_ws_endpoints failed: {error}"),
};
let endpoint_names = manager.endpoint_names().await;
assert_eq!(endpoint_names, vec!["ws_a".to_string(), "ws_c".to_string()]);
}
#[tokio::test]
async fn start_all_connects_all_clients_and_republishes_events() {
let server_a = TestWsServer::spawn_echo_server().await;
let server_b = TestWsServer::spawn_echo_server().await;
let endpoints = vec![
make_ws_endpoint("ws_a", server_a.url.clone(), true),
make_ws_endpoint("ws_b", server_b.url.clone(), true),
];
let manager_result = crate::WsManager::from_ws_endpoints(&endpoints);
let manager = match manager_result {
Ok(manager) => manager,
Err(error) => panic!("from_ws_endpoints failed: {error}"),
};
let mut receiver = manager.subscribe_events();
let start_result = manager.start_all().await;
let started_count = match start_result {
Ok(started_count) => started_count,
Err(error) => panic!("start_all failed: {error}"),
};
assert_eq!(started_count, 2);
let mut connected_names = std::collections::BTreeSet::new();
for _ in 0..4 {
let event = recv_manager_event(&mut receiver).await;
if let crate::WsEvent::Connected { endpoint_name, .. } = event {
connected_names.insert(endpoint_name);
}
if connected_names.len() == 2 {
break;
}
}
assert!(connected_names.contains("ws_a"));
assert!(connected_names.contains("ws_b"));
let stop_result = manager.stop_all().await;
if let Err(error) = stop_result {
panic!("stop_all failed: {error}");
}
server_a.shutdown().await;
server_b.shutdown().await;
}
#[tokio::test]
async fn snapshot_reports_states() {
let server = TestWsServer::spawn_echo_server().await;
let endpoints = vec![make_ws_endpoint("ws_a", server.url.clone(), true)];
let manager_result = crate::WsManager::from_ws_endpoints(&endpoints);
let manager = match manager_result {
Ok(manager) => manager,
Err(error) => panic!("from_ws_endpoints failed: {error}"),
};
let start_result = manager.start_all().await;
if let Err(error) = start_result {
panic!("start_all failed: {error}");
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let snapshot_result = manager.snapshot().await;
let snapshot = match snapshot_result {
Ok(snapshot) => snapshot,
Err(error) => panic!("snapshot failed: {error}"),
};
assert_eq!(snapshot.endpoint_count, 1);
assert_eq!(snapshot.started_count, 1);
assert_eq!(snapshot.endpoints.len(), 1);
assert_eq!(snapshot.endpoints[0].endpoint_name, "ws_a");
assert_eq!(
snapshot.endpoints[0].state,
crate::KbConnectionState::Connected
);
let stop_result = manager.stop_all().await;
if let Err(error) = stop_result {
panic!("stop_all failed: {error}");
}
server.shutdown().await;
}
#[tokio::test]
async fn attach_detection_relay_allows_managed_client_forwarding() {
let server = TestWsServer::spawn_json_rpc_server().await;
let endpoints = vec![make_ws_endpoint("ws_a", server.url.clone(), true)];
let manager_result = crate::WsManager::from_ws_endpoints(&endpoints);
let manager = match manager_result {
Ok(manager) => manager,
Err(error) => panic!("from_ws_endpoints failed: {error}"),
};
let database = create_database().await;
let database = std::sync::Arc::new(database);
let attach_result = manager.attach_detection_relay(database.clone(), 16).await;
if let Err(error) = attach_result {
panic!("attach_detection_relay failed: {error}");
}
let start_result = manager.start_all().await;
if let Err(error) = start_result {
panic!("start_all failed: {error}");
}
let client_option = manager.client("ws_a").await;
let client = match client_option {
Some(client) => client,
None => panic!("client must exist"),
};
let subscribe_result = client
.send_json_rpc_request("slotSubscribe".to_string(), std::vec::Vec::new())
.await;
if let Err(error) = subscribe_result {
panic!("slotSubscribe failed: {error}");
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let observations_result =
crate::list_recent_onchain_observations(database.as_ref(), 10).await;
let observations = match observations_result {
Ok(observations) => observations,
Err(error) => panic!("list_recent_onchain_observations failed: {error}"),
};
assert_eq!(observations.len(), 1);
assert_eq!(observations[0].observation_kind, "ws.slot_notification");
let stop_result = manager.stop_all().await;
if let Err(error) = stop_result {
panic!("stop_all failed: {error}");
}
let detach_result = manager.detach_detection_relay().await;
if let Err(error) = detach_result {
panic!("detach_detection_relay failed: {error}");
}
server.shutdown().await;
}
#[tokio::test]
async fn from_config_builds_only_enabled_clients() {
let config = crate::KbConfig {
app: crate::KbAppConfig {
name: "test".to_string(),
environment: "test".to_string(),
auto_reconnect_default: false,
},
logging: crate::KbLoggingConfig {
level: "debug".to_string(),
console_enabled: true,
console_ansi: true,
file_enabled: false,
directory: "./logs".to_string(),
file_prefix: "app".to_string(),
rotation: "daily".to_string(),
message_format: "compact".to_string(),
time_format: "rfc3339_millis".to_string(),
target_filters: std::collections::BTreeMap::new(),
},
data: crate::KbDataConfig {
sqlite_path: "data/test.sqlite3".to_string(),
wallets_directory: "wallets".to_string(),
},
solana: crate::KbSolanaConfig {
http_endpoints: std::vec::Vec::new(),
ws_endpoints: vec![
make_ws_endpoint("ws_a", "ws://127.0.0.1:1".to_string(), true),
make_ws_endpoint("ws_b", "ws://127.0.0.1:2".to_string(), false),
make_ws_endpoint("ws_c", "ws://127.0.0.1:3".to_string(), true),
],
},
database: crate::KbDatabaseConfig {
enabled: false,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: "data/test.sqlite3".to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: false,
use_wal: false,
},
},
};
let manager_result = crate::WsManager::from_config(&config);
let manager = match manager_result {
Ok(manager) => manager,
Err(error) => panic!("from_config failed: {error}"),
};
let endpoint_names = manager.endpoint_names().await;
assert_eq!(endpoint_names, vec!["ws_a".to_string(), "ws_c".to_string()]);
}
#[tokio::test]
async fn endpoint_names_for_role_filters_managed_clients() {
let mut ws_a = make_ws_endpoint("ws_a", "ws://127.0.0.1:1".to_string(), true);
ws_a.roles = vec!["slots".to_string(), "general".to_string()];
let mut ws_b = make_ws_endpoint("ws_b", "ws://127.0.0.1:2".to_string(), true);
ws_b.roles = vec!["programs".to_string()];
let mut ws_c = make_ws_endpoint("ws_c", "ws://127.0.0.1:3".to_string(), true);
ws_c.roles = vec!["slots".to_string()];
let endpoints = vec![ws_a, ws_b, ws_c];
let manager_result = crate::WsManager::from_ws_endpoints(&endpoints);
let manager = match manager_result {
Ok(manager) => manager,
Err(error) => panic!("from_ws_endpoints failed: {error}"),
};
let slot_endpoints = manager.endpoint_names_for_role("slots").await;
let program_endpoints = manager.endpoint_names_for_role("programs").await;
let unknown_endpoints = manager.endpoint_names_for_role("unknown").await;
assert_eq!(slot_endpoints, vec!["ws_a".to_string(), "ws_c".to_string()]);
assert_eq!(program_endpoints, vec!["ws_b".to_string()]);
assert!(unknown_endpoints.is_empty());
}
}