// file: kb_lib/src/ws_manager.rs //! Multi-client WebSocket orchestration. //! //! This module provides a thin orchestration layer above [`crate::WsClient`]. //! It builds and manages multiple WebSocket clients, republishes a unified //! event stream, and can attach one shared detection relay to all managed //! clients. /// Snapshot of one managed endpoint. #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct WsManagedEndpointSnapshot { /// Endpoint logical name. pub endpoint_name: std::string::String, /// Endpoint resolved URL. pub resolved_url: std::string::String, /// Endpoint provider name. pub provider: std::string::String, /// Current connection state. pub state: crate::ConnectionState, /// Number of active subscriptions currently tracked by the client. pub active_subscription_count: usize, } /// Snapshot of the whole manager state. #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct WsManagerSnapshot { /// Number of managed endpoints. pub endpoint_count: usize, /// Number of endpoints whose state is not `Disconnected`. pub started_count: usize, /// Per-endpoint snapshot list. pub endpoints: std::vec::Vec, } #[derive(Debug)] struct WsManagedClient { client: crate::WsClient, event_forward_abort_handle: tokio::task::AbortHandle, } /// Multi-client WebSocket orchestrator. #[derive(Debug)] pub struct WsManager { clients: tokio::sync::Mutex>, event_tx: tokio::sync::broadcast::Sender, detection_relay_sender: tokio::sync::Mutex< std::option::Option>, >, detection_relay_abort_handle: tokio::sync::Mutex>, transaction_resolution_relay_sender: tokio::sync::Mutex< std::option::Option>, >, transaction_resolution_relay_abort_handle: tokio::sync::Mutex>, hybrid_observation_relay_abort_handle: tokio::sync::Mutex>, } impl WsManager { /// Builds one manager from a slice of WebSocket endpoint configurations. /// /// Only enabled endpoints are retained. pub fn from_ws_endpoints(endpoints: &[crate::WsEndpointConfig]) -> Result { let mut selected_endpoints = std::vec::Vec::new(); let mut max_event_capacity = 1_usize; for endpoint in endpoints { if endpoint.enabled { selected_endpoints.push(endpoint.clone()); if endpoint.event_channel_capacity > max_event_capacity { max_event_capacity = endpoint.event_channel_capacity; } } } let channel_result = tokio::sync::broadcast::channel(max_event_capacity); let (event_tx, _) = channel_result; let mut clients = std::collections::BTreeMap::new(); for endpoint in selected_endpoints { if clients.contains_key(endpoint.name.as_str()) { return Err(crate::Error::Config(format!( "duplicate websocket endpoint name '{}'", endpoint.name ))); } let client_result = crate::WsClient::new(endpoint.clone()); let client = match client_result { Ok(client) => client, Err(error) => return Err(error), }; let event_forward_abort_handle = spawn_event_forward_task(client.clone(), event_tx.clone()); clients.insert( endpoint.name.clone(), WsManagedClient { client, event_forward_abort_handle }, ); } return Ok(Self { clients: tokio::sync::Mutex::new(clients), event_tx, detection_relay_sender: tokio::sync::Mutex::new(None), detection_relay_abort_handle: tokio::sync::Mutex::new(None), transaction_resolution_relay_sender: tokio::sync::Mutex::new(None), transaction_resolution_relay_abort_handle: tokio::sync::Mutex::new(None), hybrid_observation_relay_abort_handle: tokio::sync::Mutex::new(None), }); } /// Builds one manager from the application configuration. pub fn from_config(config: &crate::Config) -> Result { return Self::from_ws_endpoints(&config.solana.ws_endpoints); } /// Returns a unified broadcast receiver for all managed client events. pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver { return self.event_tx.subscribe(); } /// Returns the list of managed endpoint names. pub async fn endpoint_names(&self) -> std::vec::Vec { let clients_guard = self.clients.lock().await; return clients_guard.keys().cloned().collect(); } /// Returns the list of managed endpoint names having the requested role. pub async fn endpoint_names_for_role(&self, role: &str) -> std::vec::Vec { let clients_guard = self.clients.lock().await; let mut endpoint_names = std::vec::Vec::new(); for (endpoint_name, managed) in &*clients_guard { if managed .client .endpoint_config() .roles .iter() .any(|configured_role| return configured_role == role) { endpoint_names.push(endpoint_name.clone()); } } return endpoint_names; } /// Starts all managed endpoints having the requested role. pub async fn start_role(&self, role: &str) -> Result { let endpoint_names = self.endpoint_names_for_role(role).await; let mut started_count = 0_usize; for endpoint_name in endpoint_names { let start_result = self.start_endpoint_inner(endpoint_name.as_str()).await; let started = match start_result { Ok(started) => started, Err(error) => return Err(error), }; if started { started_count += 1; } } return Ok(started_count); } /// Stops all managed endpoints having the requested role. pub async fn stop_role(&self, role: &str) -> Result { let endpoint_names = self.endpoint_names_for_role(role).await; let mut stopped_count = 0_usize; for endpoint_name in endpoint_names { let stop_result = self.stop_endpoint_inner(endpoint_name.as_str()).await; let stopped = match stop_result { Ok(stopped) => stopped, Err(error) => return Err(error), }; if stopped { stopped_count += 1; } } return Ok(stopped_count); } /// Returns one managed client by endpoint name. pub async fn client(&self, endpoint_name: &str) -> std::option::Option { let clients_guard = self.clients.lock().await; let managed_option = clients_guard.get(endpoint_name); match managed_option { Some(managed) => return Some(managed.client.clone()), None => return None, } } async fn start_endpoint_inner(&self, endpoint_name: &str) -> Result { let client_option = self.client(endpoint_name).await; let client = match client_option { Some(client) => client, None => { return Err(crate::Error::InvalidState(format!( "unknown managed websocket endpoint '{}'", endpoint_name ))); }, }; let state = client.connection_state().await; if state == crate::ConnectionState::Connected || state == crate::ConnectionState::Connecting { return Ok(false); } let sender_option = { let sender_guard = self.detection_relay_sender.lock().await; sender_guard.clone() }; if let Some(sender) = sender_option { client.set_detection_notification_forwarder(sender).await; } let tx_resolution_sender_option = { let sender_guard = self.transaction_resolution_relay_sender.lock().await; sender_guard.clone() }; if let Some(sender) = tx_resolution_sender_option { client.set_transaction_resolution_notification_forwarder(sender).await; } let connect_result = client.connect().await; if let Err(error) = connect_result { return Err(error); } return Ok(true); } /// Starts one managed endpoint. pub async fn start_endpoint(&self, endpoint_name: &str) -> Result<(), crate::Error> { let start_result = self.start_endpoint_inner(endpoint_name).await; match start_result { Ok(_) => return Ok(()), Err(error) => return Err(error), } } async fn stop_endpoint_inner(&self, endpoint_name: &str) -> Result { let client_option = self.client(endpoint_name).await; let client = match client_option { Some(client) => client, None => { return Err(crate::Error::InvalidState(format!( "unknown managed websocket endpoint '{}'", endpoint_name ))); }, }; let state = client.connection_state().await; if state == crate::ConnectionState::Disconnected || state == crate::ConnectionState::Disconnecting { return Ok(false); } client.clear_detection_notification_forwarder().await; client.clear_transaction_resolution_notification_forwarder().await; let disconnect_result = client.disconnect().await; if let Err(error) = disconnect_result { return Err(error); } return Ok(true); } /// Stops one managed endpoint. pub async fn stop_endpoint(&self, endpoint_name: &str) -> Result<(), crate::Error> { let stop_result = self.stop_endpoint_inner(endpoint_name).await; match stop_result { Ok(_) => return Ok(()), Err(error) => return Err(error), } } /// Starts all managed endpoints. pub async fn start_all(&self) -> Result { let endpoint_names = self.endpoint_names().await; let mut started_count = 0_usize; for endpoint_name in endpoint_names { let start_result = self.start_endpoint_inner(endpoint_name.as_str()).await; let started = match start_result { Ok(started) => started, Err(error) => return Err(error), }; if started { started_count += 1; } } return Ok(started_count); } /// Stops all managed endpoints. pub async fn stop_all(&self) -> Result { let endpoint_names = self.endpoint_names().await; let mut stopped_count = 0_usize; for endpoint_name in endpoint_names { let stop_result = self.stop_endpoint_inner(endpoint_name.as_str()).await; let stopped = match stop_result { Ok(stopped) => stopped, Err(error) => return Err(error), }; if stopped { stopped_count += 1; } } return Ok(stopped_count); } /// Returns the number of active subscriptions for one endpoint. pub async fn active_subscription_count( &self, endpoint_name: &str, ) -> Result { let client_option = self.client(endpoint_name).await; let client = match client_option { Some(client) => client, None => { return Err(crate::Error::InvalidState(format!( "unknown managed websocket endpoint '{}'", endpoint_name ))); }, }; return Ok(client.active_subscription_count().await); } /// Returns a consolidated snapshot of all managed endpoints. pub async fn snapshot(&self) -> Result { let clients_to_snapshot = { let clients_guard = self.clients.lock().await; let mut values = std::vec::Vec::new(); for (endpoint_name, managed) in &*clients_guard { values.push((endpoint_name.clone(), managed.client.clone())); } values }; let mut endpoints = std::vec::Vec::new(); let mut started_count = 0_usize; for (endpoint_name, client) in clients_to_snapshot { let state = client.connection_state().await; let active_subscription_count = client.active_subscription_count().await; if state != crate::ConnectionState::Disconnected { started_count += 1; } endpoints.push(WsManagedEndpointSnapshot { endpoint_name, resolved_url: client.endpoint_url().to_string(), provider: client.endpoint_config().provider.clone(), state, active_subscription_count, }); } return Ok(WsManagerSnapshot { endpoint_count: endpoints.len(), started_count, endpoints, }); } /// Attaches one shared detection relay to all managed clients. pub async fn attach_detection_relay( &self, database: std::sync::Arc, queue_capacity: usize, ) -> Result<(), crate::Error> { { let sender_guard = self.detection_relay_sender.lock().await; if sender_guard.is_some() { return Err(crate::Error::InvalidState( "websocket detection relay is already attached".to_string(), )); } } let persistence = crate::DetectionPersistenceService::new(database); let detector = crate::SolanaWsDetectionService::new(persistence); let relay = crate::WsDetectionRelay::new(detector); let (sender, receiver) = crate::WsDetectionRelay::channel(queue_capacity); let relay_task = relay.spawn(receiver); let relay_abort_handle = relay_task.abort_handle(); { let mut sender_guard = self.detection_relay_sender.lock().await; *sender_guard = Some(sender.clone()); } { let mut abort_guard = self.detection_relay_abort_handle.lock().await; *abort_guard = Some(relay_abort_handle); } let clients = { let clients_guard = self.clients.lock().await; let mut values = std::vec::Vec::new(); for managed in clients_guard.values() { values.push(managed.client.clone()); } values }; for client in clients { client.set_detection_notification_forwarder(sender.clone()).await; } return Ok(()); } /// Detaches the shared detection relay from all managed clients. pub async fn detach_detection_relay(&self) -> Result<(), crate::Error> { let clients = { let clients_guard = self.clients.lock().await; let mut values = std::vec::Vec::new(); for managed in clients_guard.values() { values.push(managed.client.clone()); } values }; for client in clients { client.clear_detection_notification_forwarder().await; } { let mut sender_guard = self.detection_relay_sender.lock().await; *sender_guard = None; } let abort_handle_option = { let mut abort_guard = self.detection_relay_abort_handle.lock().await; abort_guard.take() }; if let Some(abort_handle) = abort_handle_option { abort_handle.abort(); } return Ok(()); } /// Returns whether one managed endpoint exists. pub async fn has_endpoint(&self, endpoint_name: &str) -> bool { let clients_guard = self.clients.lock().await; return clients_guard.contains_key(endpoint_name); } /// Returns the current connection state of one endpoint. pub async fn endpoint_state( &self, endpoint_name: &str, ) -> Result { let client_option = self.client(endpoint_name).await; let client = match client_option { Some(client) => client, None => { return Err(crate::Error::InvalidState(format!( "unknown managed websocket endpoint '{}'", endpoint_name ))); }, }; return Ok(client.connection_state().await); } /// Attaches one shared transaction-resolution relay to all managed clients. pub async fn attach_transaction_resolution_relay( &self, database: std::sync::Arc, http_pool: std::sync::Arc, http_role: std::string::String, queue_capacity: usize, ) -> Result<(), crate::Error> { { let sender_guard = self.transaction_resolution_relay_sender.lock().await; if sender_guard.is_some() { return Err(crate::Error::InvalidState( "websocket transaction resolution relay is already attached".to_string(), )); } } let resolver = crate::TransactionResolutionService::new(http_pool, database, http_role); let relay = crate::WsTransactionResolutionRelay::new(resolver); let (sender, receiver) = crate::WsTransactionResolutionRelay::channel(queue_capacity); let relay_task = relay.spawn(receiver); let relay_abort_handle = relay_task.abort_handle(); { let mut sender_guard = self.transaction_resolution_relay_sender.lock().await; *sender_guard = Some(sender.clone()); } { let mut abort_guard = self.transaction_resolution_relay_abort_handle.lock().await; *abort_guard = Some(relay_abort_handle); } let clients = { let clients_guard = self.clients.lock().await; let mut values = std::vec::Vec::new(); for managed in clients_guard.values() { values.push(managed.client.clone()); } values }; for client in clients { client.set_transaction_resolution_notification_forwarder(sender.clone()).await; } return Ok(()); } /// Detaches the shared transaction-resolution relay from all managed clients. pub async fn detach_transaction_resolution_relay(&self) -> Result<(), crate::Error> { let clients = { let clients_guard = self.clients.lock().await; let mut values = std::vec::Vec::new(); for managed in clients_guard.values() { values.push(managed.client.clone()); } values }; for client in clients { client.clear_transaction_resolution_notification_forwarder().await; } { let mut sender_guard = self.transaction_resolution_relay_sender.lock().await; *sender_guard = None; } let abort_handle_option = { let mut abort_guard = self.transaction_resolution_relay_abort_handle.lock().await; abort_guard.take() }; if let Some(abort_handle) = abort_handle_option { abort_handle.abort(); } return Ok(()); } /// Collects the current hybrid WebSocket watch snapshot. pub async fn collect_hybrid_watch_snapshot( &self, database: std::sync::Arc, ) -> Result { let runtime = crate::WsHybridRuntimeService::new(database); return runtime.collect_watch_snapshot().await; } /// Attaches one shared hybrid observation relay to the manager event stream. pub async fn attach_hybrid_observation_relay( &self, database: std::sync::Arc, ) -> Result<(), crate::Error> { { let abort_guard = self.hybrid_observation_relay_abort_handle.lock().await; if abort_guard.is_some() { return Err(crate::Error::InvalidState( "websocket hybrid observation relay is already attached".to_string(), )); } } let runtime = crate::WsHybridRuntimeService::new(database); let receiver = self.subscribe_events(); let abort_handle = spawn_hybrid_observation_relay_task(receiver, runtime); { let mut abort_guard = self.hybrid_observation_relay_abort_handle.lock().await; *abort_guard = Some(abort_handle); } return Ok(()); } /// Detaches the shared hybrid observation relay from the manager event stream. pub async fn detach_hybrid_observation_relay(&self) -> Result<(), crate::Error> { let abort_handle_option = { let mut abort_guard = self.hybrid_observation_relay_abort_handle.lock().await; abort_guard.take() }; if let Some(abort_handle) = abort_handle_option { abort_handle.abort(); } return Ok(()); } } impl Drop for WsManager { fn drop(&mut self) { let clients_result = self.clients.try_lock(); if let Ok(mut clients_guard) = clients_result { for managed in clients_guard.values_mut() { managed.event_forward_abort_handle.abort(); } } let relay_abort_result = self.detection_relay_abort_handle.try_lock(); if let Ok(mut abort_guard) = relay_abort_result { if let Some(abort_handle) = abort_guard.take() { abort_handle.abort(); } } let tx_resolution_abort_result = self.transaction_resolution_relay_abort_handle.try_lock(); if let Ok(mut abort_guard) = tx_resolution_abort_result { if let Some(abort_handle) = abort_guard.take() { abort_handle.abort(); } } let hybrid_abort_result = self.hybrid_observation_relay_abort_handle.try_lock(); if let Ok(mut abort_guard) = hybrid_abort_result { if let Some(abort_handle) = abort_guard.take() { abort_handle.abort(); } } } } fn spawn_event_forward_task( client: crate::WsClient, event_tx: tokio::sync::broadcast::Sender, ) -> tokio::task::AbortHandle { let mut receiver = client.subscribe_events(); let task = tokio::spawn(async move { loop { let recv_result = receiver.recv().await; match recv_result { Ok(event) => { let _ = event_tx.send(event); }, Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}, Err(tokio::sync::broadcast::error::RecvError::Closed) => { break; }, } } }); return task.abort_handle(); } fn spawn_hybrid_observation_relay_task( mut receiver: tokio::sync::broadcast::Receiver, runtime: crate::WsHybridRuntimeService, ) -> tokio::task::AbortHandle { let task = tokio::spawn(async move { loop { let recv_result = receiver.recv().await; match recv_result { Ok(event) => { handle_hybrid_observation_manager_event(&runtime, event).await; }, Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}, Err(tokio::sync::broadcast::error::RecvError::Closed) => { break; }, } } }); return task.abort_handle(); } async fn handle_hybrid_observation_manager_event( runtime: &crate::WsHybridRuntimeService, event: crate::WsEvent, ) { match event { crate::WsEvent::SubscriptionNotification { endpoint_name, subscription, notification, .. } => { handle_hybrid_subscription_notification( runtime, endpoint_name, subscription, notification, ) .await; }, crate::WsEvent::JsonRpcNotificationWithoutSubscription { endpoint_name, notification } => { if notification.method == "logsNotification" { let value_result = serde_json::to_value(notification.clone()); let value = match value_result { Ok(value) => value, Err(error) => { tracing::warn!( target: "ws_manager", "cannot serialize logs notification for hybrid relay: {}", error ); return; }, }; let record_result = runtime.record_logs_notification(Some(endpoint_name), &value).await; if let Err(error) = record_result { tracing::warn!( target: "ws_manager", "hybrid logs observation failed: {}", error ); } } }, _ => {}, } } async fn handle_hybrid_subscription_notification( runtime: &crate::WsHybridRuntimeService, endpoint_name: std::string::String, subscription: crate::WsSubscriptionInfo, notification: crate::JsonRpcWsNotification, ) { let value_result = serde_json::to_value(notification.clone()); let value = match value_result { Ok(value) => value, Err(error) => { tracing::warn!( target: "ws_manager", "cannot serialize subscription notification for hybrid relay: {}", error ); return; }, }; let method = notification.method.as_str(); if method == "logsNotification" { let record_result = runtime.record_logs_notification(Some(endpoint_name), &value).await; if let Err(error) = record_result { tracing::warn!( target: "ws_manager", "hybrid logs observation failed: {}", error ); } return; } if method == "programNotification" { let watched_program_id = first_subscription_param_as_string(&subscription); let watched_program_id = match watched_program_id { Some(watched_program_id) => watched_program_id, None => { tracing::warn!( target: "ws_manager", "missing watched program id in subscription params" ); return; }, }; let record_result = runtime .record_program_notification(Some(endpoint_name), watched_program_id, &value) .await; if let Err(error) = record_result { tracing::warn!( target: "ws_manager", "hybrid program observation failed: {}", error ); } return; } if method == "accountNotification" { let watched_account = first_subscription_param_as_string(&subscription); let watched_account = match watched_account { Some(watched_account) => watched_account, None => { tracing::warn!( target: "ws_manager", "missing watched account in subscription params" ); return; }, }; let record_result = runtime .record_account_notification(Some(endpoint_name), watched_account, &value) .await; if let Err(error) = record_result { tracing::warn!( target: "kws_manager", "hybrid account observation failed: {}", error ); } } } fn first_subscription_param_as_string( subscription: &crate::WsSubscriptionInfo, ) -> std::option::Option { let first_param_option = subscription.params.first(); let first_param = match first_param_option { Some(first_param) => first_param, None => return None, }; let text_option = first_param.as_str(); match text_option { Some(text) => return Some(text.to_string()), None => return None, } } #[cfg(test)] mod tests { #[derive(Debug)] struct TestWsServer { url: std::string::String, shutdown_tx: std::option::Option>, } impl TestWsServer { async fn spawn_echo_server() -> Self { let listener_result = tokio::net::TcpListener::bind("127.0.0.1:0").await; let listener = match listener_result { Ok(listener) => listener, Err(error) => panic!("tcp bind failed: {error}"), }; let local_addr_result = listener.local_addr(); let local_addr = match local_addr_result { Ok(local_addr) => local_addr, Err(error) => panic!("local_addr failed: {error}"), }; let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); tokio::spawn(async move { loop { tokio::select! { _ = &mut shutdown_rx => { break; } accept_result = listener.accept() => { let (stream, _) = match accept_result { Ok(pair) => pair, Err(_) => break, }; tokio::spawn(async move { let accept_result = tokio_tungstenite::accept_async(stream).await; let mut websocket = match accept_result { Ok(websocket) => websocket, Err(_) => return, }; loop { let next_result = futures_util::StreamExt::next(&mut websocket).await; let message_result = match next_result { Some(message_result) => message_result, None => break, }; let message = match message_result { Ok(message) => message, Err(_) => break, }; match message { tokio_tungstenite::tungstenite::Message::Text(text) => { let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Text(text), ).await; if send_result.is_err() { break; } } tokio_tungstenite::tungstenite::Message::Ping(data) => { let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Pong(data), ).await; if send_result.is_err() { break; } } tokio_tungstenite::tungstenite::Message::Close(frame) => { let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Close(frame), ).await; let _ = send_result; break; } tokio_tungstenite::tungstenite::Message::Binary(_) => {} tokio_tungstenite::tungstenite::Message::Pong(_) => {} tokio_tungstenite::tungstenite::Message::Frame(_) => {} } } }); } } } }); return Self { url: format!("ws://{}", local_addr), shutdown_tx: Some(shutdown_tx), }; } async fn spawn_json_rpc_server() -> Self { let listener_result = tokio::net::TcpListener::bind("127.0.0.1:0").await; let listener = match listener_result { Ok(listener) => listener, Err(error) => panic!("tcp bind failed: {error}"), }; let local_addr_result = listener.local_addr(); let local_addr = match local_addr_result { Ok(local_addr) => local_addr, Err(error) => panic!("local_addr failed: {error}"), }; let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); tokio::spawn(async move { loop { tokio::select! { _ = &mut shutdown_rx => { break; } accept_result = listener.accept() => { let (stream, _) = match accept_result { Ok(pair) => pair, Err(_) => break, }; tokio::spawn(async move { let accept_result = tokio_tungstenite::accept_async(stream).await; let mut websocket = match accept_result { Ok(websocket) => websocket, Err(_) => return, }; loop { let next_result = futures_util::StreamExt::next(&mut websocket).await; let message_result = match next_result { Some(message_result) => message_result, None => break, }; let message = match message_result { Ok(message) => message, Err(_) => break, }; match message { tokio_tungstenite::tungstenite::Message::Text(text) => { let parse_result: Result = serde_json::from_str(text.as_str()); let request_value = match parse_result { Ok(request_value) => request_value, Err(_) => continue, }; let method_option = request_value .get("method") .and_then(serde_json::Value::as_str); let method = match method_option { Some(method) => method, None => continue, }; if method == "slotSubscribe" { let id_value = match request_value.get("id") { Some(id_value) => id_value.clone(), None => serde_json::Value::from(1_u64), }; let success_text = serde_json::json!({ "jsonrpc": "2.0", "result": 77_u64, "id": id_value, }).to_string(); let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Text(success_text.into()), ).await; if send_result.is_err() { break; } let notification_text = serde_json::json!({ "jsonrpc": "2.0", "method": "slotNotification", "params": { "result": { "slot": 12_u64, "parent": 11_u64, "root": 10_u64 }, "subscription": 77_u64 } }).to_string(); let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Text(notification_text.into()), ).await; if send_result.is_err() { break; } } else if method == "slotUnsubscribe" { let id_value = match request_value.get("id") { Some(id_value) => id_value.clone(), None => serde_json::Value::from(2_u64), }; let success_text = serde_json::json!({ "jsonrpc": "2.0", "result": true, "id": id_value, }).to_string(); let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Text(success_text.into()), ).await; if send_result.is_err() { break; } } } tokio_tungstenite::tungstenite::Message::Ping(data) => { let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Pong(data), ).await; if send_result.is_err() { break; } } tokio_tungstenite::tungstenite::Message::Close(frame) => { let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Close(frame), ).await; let _ = send_result; break; } tokio_tungstenite::tungstenite::Message::Binary(_) => {} tokio_tungstenite::tungstenite::Message::Pong(_) => {} tokio_tungstenite::tungstenite::Message::Frame(_) => {} } } }); } } } }); return Self { url: format!("ws://{}", local_addr), shutdown_tx: Some(shutdown_tx), }; } async fn spawn_hybrid_json_rpc_server() -> Self { let listener_result = tokio::net::TcpListener::bind("127.0.0.1:0").await; let listener = match listener_result { Ok(listener) => listener, Err(error) => panic!("tcp bind failed: {error}"), }; let local_addr_result = listener.local_addr(); let local_addr = match local_addr_result { Ok(local_addr) => local_addr, Err(error) => panic!("local_addr failed: {error}"), }; let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); tokio::spawn(async move { loop { tokio::select! { _ = &mut shutdown_rx => { break; } accept_result = listener.accept() => { let (stream, _) = match accept_result { Ok(pair) => pair, Err(_) => break, }; tokio::spawn(async move { let accept_result = tokio_tungstenite::accept_async(stream).await; let mut websocket = match accept_result { Ok(websocket) => websocket, Err(_) => return, }; loop { let next_result = futures_util::StreamExt::next(&mut websocket).await; let message_result = match next_result { Some(message_result) => message_result, None => break, }; let message = match message_result { Ok(message) => message, Err(_) => break, }; match message { tokio_tungstenite::tungstenite::Message::Text(text) => { let parse_result: Result = serde_json::from_str(text.as_str()); let request_value = match parse_result { Ok(request_value) => request_value, Err(_) => continue, }; let method_option = request_value .get("method") .and_then(serde_json::Value::as_str); let method = match method_option { Some(method) => method, None => continue, }; let id_value = match request_value.get("id") { Some(id_value) => id_value.clone(), None => serde_json::Value::from(1_u64), }; if method == "logsSubscribe" { let success_text = serde_json::json!({ "jsonrpc": "2.0", "result": 701_u64, "id": id_value, }).to_string(); let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Text(success_text.into()), ).await; if send_result.is_err() { break; } let notification_text = serde_json::json!({ "jsonrpc": "2.0", "method": "logsNotification", "params": { "result": { "context": { "slot": 1001_u64 }, "value": { "signature": "HybridLogsSig111", "err": null, "logs": [ "Program log: Instruction: Swap" ] } }, "subscription": 701_u64 } }).to_string(); let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Text(notification_text.into()), ).await; if send_result.is_err() { break; } } else if method == "programSubscribe" { let success_text = serde_json::json!({ "jsonrpc": "2.0", "result": 702_u64, "id": id_value, }).to_string(); let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Text(success_text.into()), ).await; if send_result.is_err() { break; } let notification_text = serde_json::json!({ "jsonrpc": "2.0", "method": "programNotification", "params": { "result": { "context": { "slot": 1002_u64 }, "value": { "pubkey": "HybridProgramOwned111", "account": { "lamports": 1, "owner": "HybridProgram111" } } }, "subscription": 702_u64 } }).to_string(); let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Text(notification_text.into()), ).await; if send_result.is_err() { break; } } else if method == "accountSubscribe" { let success_text = serde_json::json!({ "jsonrpc": "2.0", "result": 703_u64, "id": id_value, }).to_string(); let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Text(success_text.into()), ).await; if send_result.is_err() { break; } let notification_text = serde_json::json!({ "jsonrpc": "2.0", "method": "accountNotification", "params": { "result": { "context": { "slot": 1003_u64 }, "value": { "lamports": 10, "owner": "HybridProgram111" } }, "subscription": 703_u64 } }).to_string(); let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Text(notification_text.into()), ).await; if send_result.is_err() { break; } } } tokio_tungstenite::tungstenite::Message::Ping(data) => { let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Pong(data), ).await; if send_result.is_err() { break; } } tokio_tungstenite::tungstenite::Message::Close(frame) => { let send_result = futures_util::SinkExt::send( &mut websocket, tokio_tungstenite::tungstenite::Message::Close(frame), ).await; let _ = send_result; break; } tokio_tungstenite::tungstenite::Message::Binary(_) => {} tokio_tungstenite::tungstenite::Message::Pong(_) => {} tokio_tungstenite::tungstenite::Message::Frame(_) => {} } } }); } } } }); return Self { url: format!("ws://{}", local_addr), shutdown_tx: Some(shutdown_tx), }; } async fn shutdown(mut self) { if let Some(shutdown_tx) = self.shutdown_tx.take() { let _ = shutdown_tx.send(()); } } } fn make_ws_endpoint( name: &str, url: std::string::String, enabled: bool, ) -> crate::WsEndpointConfig { return crate::WsEndpointConfig { name: name.to_string(), enabled, provider: "test".to_string(), url, api_key_env_var: None, roles: vec!["test".to_string()], max_subscriptions: 16, connect_timeout_ms: 2000, request_timeout_ms: 2000, unsubscribe_timeout_ms: 1000, write_channel_capacity: 32, event_channel_capacity: 64, auto_reconnect: false, }; } async fn recv_manager_event( receiver: &mut tokio::sync::broadcast::Receiver, ) -> crate::WsEvent { let timeout_result = tokio::time::timeout(std::time::Duration::from_secs(2), receiver.recv()).await; let recv_result = match timeout_result { Ok(recv_result) => recv_result, Err(_) => panic!("manager event receive timeout"), }; match recv_result { Ok(event) => return event, Err(error) => panic!("manager event receive failed: {error}"), } } async fn create_database() -> crate::Database { let tempdir_result = tempfile::tempdir(); let tempdir = match tempdir_result { Ok(tempdir) => tempdir, Err(error) => panic!("tempdir failed: {error}"), }; let database_path = tempdir.path().join("ws_manager.sqlite3"); let config = crate::DatabaseConfig { enabled: true, backend: crate::DatabaseBackend::Sqlite, sqlite: crate::SqliteDatabaseConfig { path: database_path.to_string_lossy().to_string(), create_if_missing: true, busy_timeout_ms: 5000, max_connections: 1, auto_initialize_schema: true, use_wal: true, }, }; let database_result = crate::Database::connect_and_initialize(&config).await; match database_result { Ok(database) => return database, Err(error) => panic!("database init failed: {error}"), } } #[tokio::test] async fn from_ws_endpoints_builds_only_enabled_clients() { let endpoints = vec![ make_ws_endpoint("ws_a", "ws://127.0.0.1:1".to_string(), true), make_ws_endpoint("ws_b", "ws://127.0.0.1:2".to_string(), false), make_ws_endpoint("ws_c", "ws://127.0.0.1:3".to_string(), true), ]; let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); let manager = match manager_result { Ok(manager) => manager, Err(error) => panic!("from_ws_endpoints failed: {error}"), }; let endpoint_names = manager.endpoint_names().await; assert_eq!(endpoint_names, vec!["ws_a".to_string(), "ws_c".to_string()]); } #[tokio::test] async fn start_all_connects_all_clients_and_republishes_events() { let server_a = TestWsServer::spawn_echo_server().await; let server_b = TestWsServer::spawn_echo_server().await; let endpoints = vec![ make_ws_endpoint("ws_a", server_a.url.clone(), true), make_ws_endpoint("ws_b", server_b.url.clone(), true), ]; let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); let manager = match manager_result { Ok(manager) => manager, Err(error) => panic!("from_ws_endpoints failed: {error}"), }; let mut receiver = manager.subscribe_events(); let start_result = manager.start_all().await; let started_count = match start_result { Ok(started_count) => started_count, Err(error) => panic!("start_all failed: {error}"), }; assert_eq!(started_count, 2); let mut connected_names = std::collections::BTreeSet::new(); for _ in 0..4 { let event = recv_manager_event(&mut receiver).await; if let crate::WsEvent::Connected { endpoint_name, .. } = event { connected_names.insert(endpoint_name); } if connected_names.len() == 2 { break; } } assert!(connected_names.contains("ws_a")); assert!(connected_names.contains("ws_b")); let stop_result = manager.stop_all().await; if let Err(error) = stop_result { panic!("stop_all failed: {error}"); } server_a.shutdown().await; server_b.shutdown().await; } #[tokio::test] async fn snapshot_reports_states() { let server = TestWsServer::spawn_echo_server().await; let endpoints = vec![make_ws_endpoint("ws_a", server.url.clone(), true)]; let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); let manager = match manager_result { Ok(manager) => manager, Err(error) => panic!("from_ws_endpoints failed: {error}"), }; let start_result = manager.start_all().await; if let Err(error) = start_result { panic!("start_all failed: {error}"); } tokio::time::sleep(std::time::Duration::from_millis(50)).await; let snapshot_result = manager.snapshot().await; let snapshot = match snapshot_result { Ok(snapshot) => snapshot, Err(error) => panic!("snapshot failed: {error}"), }; assert_eq!(snapshot.endpoint_count, 1); assert_eq!(snapshot.started_count, 1); assert_eq!(snapshot.endpoints.len(), 1); assert_eq!(snapshot.endpoints[0].endpoint_name, "ws_a"); assert_eq!(snapshot.endpoints[0].state, crate::ConnectionState::Connected); let stop_result = manager.stop_all().await; if let Err(error) = stop_result { panic!("stop_all failed: {error}"); } server.shutdown().await; } #[tokio::test] async fn attach_detection_relay_allows_managed_client_forwarding() { let server = TestWsServer::spawn_json_rpc_server().await; let endpoints = vec![make_ws_endpoint("ws_a", server.url.clone(), true)]; let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); let manager = match manager_result { Ok(manager) => manager, Err(error) => panic!("from_ws_endpoints failed: {error}"), }; let database = create_database().await; let database = std::sync::Arc::new(database); let attach_result = manager.attach_detection_relay(database.clone(), 16).await; if let Err(error) = attach_result { panic!("attach_detection_relay failed: {error}"); } let start_result = manager.start_all().await; if let Err(error) = start_result { panic!("start_all failed: {error}"); } let client_option = manager.client("ws_a").await; let client = match client_option { Some(client) => client, None => panic!("client must exist"), }; let subscribe_result = client .send_json_rpc_request("slotSubscribe".to_string(), std::vec::Vec::new()) .await; if let Err(error) = subscribe_result { panic!("slotSubscribe failed: {error}"); } tokio::time::sleep(std::time::Duration::from_millis(200)).await; let observations_result = crate::query_onchain_observations_list_recent(database.as_ref(), 10).await; let observations = match observations_result { Ok(observations) => observations, Err(error) => panic!("list_recent_onchain_observations failed: {error}"), }; assert_eq!(observations.len(), 1); assert_eq!(observations[0].observation_kind, "ws.slot_notification"); let stop_result = manager.stop_all().await; if let Err(error) = stop_result { panic!("stop_all failed: {error}"); } let detach_result = manager.detach_detection_relay().await; if let Err(error) = detach_result { panic!("detach_detection_relay failed: {error}"); } server.shutdown().await; } #[tokio::test] async fn from_config_builds_only_enabled_clients() { let config = crate::Config { app: crate::AppConfig { name: "test".to_string(), environment: "test".to_string(), auto_reconnect_default: false, }, logging: crate::LoggingConfig { level: "debug".to_string(), console_enabled: true, console_ansi: true, file_enabled: false, directory: "./logs".to_string(), file_prefix: "app".to_string(), rotation: "daily".to_string(), message_format: "compact".to_string(), time_format: "rfc3339_millis".to_string(), target_filters: std::collections::BTreeMap::new(), }, data: crate::DataConfig { sqlite_path: "data/test.sqlite3".to_string(), wallets_directory: "wallets".to_string(), }, solana: crate::SolanaConfig { http_endpoints: std::vec::Vec::new(), ws_endpoints: vec![ make_ws_endpoint("ws_a", "ws://127.0.0.1:1".to_string(), true), make_ws_endpoint("ws_b", "ws://127.0.0.1:2".to_string(), false), make_ws_endpoint("ws_c", "ws://127.0.0.1:3".to_string(), true), ], }, database: crate::DatabaseConfig { enabled: false, backend: crate::DatabaseBackend::Sqlite, sqlite: crate::SqliteDatabaseConfig { path: "data/test.sqlite3".to_string(), create_if_missing: true, busy_timeout_ms: 5000, max_connections: 1, auto_initialize_schema: false, use_wal: false, }, }, }; let manager_result = crate::WsManager::from_config(&config); let manager = match manager_result { Ok(manager) => manager, Err(error) => panic!("from_config failed: {error}"), }; let endpoint_names = manager.endpoint_names().await; assert_eq!(endpoint_names, vec!["ws_a".to_string(), "ws_c".to_string()]); } #[tokio::test] async fn endpoint_names_for_role_filters_managed_clients() { let mut ws_a = make_ws_endpoint("ws_a", "ws://127.0.0.1:1".to_string(), true); ws_a.roles = vec!["slots".to_string(), "general".to_string()]; let mut ws_b = make_ws_endpoint("ws_b", "ws://127.0.0.1:2".to_string(), true); ws_b.roles = vec!["programs".to_string()]; let mut ws_c = make_ws_endpoint("ws_c", "ws://127.0.0.1:3".to_string(), true); ws_c.roles = vec!["slots".to_string()]; let endpoints = vec![ws_a, ws_b, ws_c]; let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); let manager = match manager_result { Ok(manager) => manager, Err(error) => panic!("from_ws_endpoints failed: {error}"), }; let slot_endpoints = manager.endpoint_names_for_role("slots").await; let program_endpoints = manager.endpoint_names_for_role("programs").await; let unknown_endpoints = manager.endpoint_names_for_role("unknown").await; assert_eq!(slot_endpoints, vec!["ws_a".to_string(), "ws_c".to_string()]); assert_eq!(program_endpoints, vec!["ws_b".to_string()]); assert!(unknown_endpoints.is_empty()); } async fn seed_hybrid_watch_database(database: &crate::Database) { let dex_a = crate::DexDto::new( "raydium_amm_v4".to_string(), "Raydium AmmV4".to_string(), Some("HybridRaydiumProgram111".to_string()), None, true, ); let dex_a_id_result = crate::query_dexs_upsert(database, &dex_a).await; let dex_a_id = match dex_a_id_result { Ok(dex_a_id) => dex_a_id, Err(error) => panic!("dex A upsert failed: {error}"), }; let dex_b = crate::DexDto::new( "meteora_dbc".to_string(), "Meteora DBC".to_string(), Some("HybridMeteoraProgram111".to_string()), Some("HybridSharedRouter111".to_string()), true, ); let dex_b_id_result = crate::query_dexs_upsert(database, &dex_b).await; let dex_b_id = match dex_b_id_result { Ok(dex_b_id) => dex_b_id, Err(error) => panic!("dex B upsert failed: {error}"), }; let pool_a = crate::PoolDto::new( dex_a_id, "HybridPool111".to_string(), crate::PoolKind::Amm, crate::PoolStatus::Active, ); let pool_a_result = crate::query_pools_upsert(database, &pool_a).await; if let Err(error) = pool_a_result { panic!("pool A upsert failed: {error}"); } let pool_b = crate::PoolDto::new( dex_b_id, "HybridPool222".to_string(), crate::PoolKind::Amm, crate::PoolStatus::Inactive, ); let pool_b_result = crate::query_pools_upsert(database, &pool_b).await; if let Err(error) = pool_b_result { panic!("pool B upsert failed: {error}"); } } #[tokio::test] async fn collect_hybrid_watch_snapshot_reads_dexes_and_active_pools() { let endpoints = vec![make_ws_endpoint("ws_a", "ws://127.0.0.1:1".to_string(), true)]; let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); let manager = match manager_result { Ok(manager) => manager, Err(error) => panic!("from_ws_endpoints failed: {error}"), }; let database = create_database().await; seed_hybrid_watch_database(&database).await; let database = std::sync::Arc::new(database); let snapshot_result = manager.collect_hybrid_watch_snapshot(database).await; let snapshot = match snapshot_result { Ok(snapshot) => snapshot, Err(error) => panic!("collect_hybrid_watch_snapshot failed: {error}"), }; assert_eq!(snapshot.program_targets.len(), 3); assert_eq!(snapshot.account_targets.len(), 1); assert_eq!(snapshot.account_targets[0].address, "HybridPool111".to_string()); } #[tokio::test] async fn attach_hybrid_observation_relay_records_logs_program_and_account_notifications() { let server = TestWsServer::spawn_hybrid_json_rpc_server().await; let endpoints = vec![make_ws_endpoint("ws_a", server.url.clone(), true)]; let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); let manager = match manager_result { Ok(manager) => manager, Err(error) => panic!("from_ws_endpoints failed: {error}"), }; let database = create_database().await; let database = std::sync::Arc::new(database); let attach_result = manager.attach_hybrid_observation_relay(database.clone()).await; if let Err(error) = attach_result { panic!("attach_hybrid_observation_relay failed: {error}"); } let start_result = manager.start_all().await; if let Err(error) = start_result { panic!("start_all failed: {error}"); } let client_option = manager.client("ws_a").await; let client = match client_option { Some(client) => client, None => panic!("client must exist"), }; let logs_subscribe_result = client.logs_subscribe_raw(serde_json::json!("all"), None).await; if let Err(error) = logs_subscribe_result { panic!("logsSubscribe failed: {error}"); } let program_subscribe_result = client.program_subscribe_raw("HybridProgram111".to_string(), None).await; if let Err(error) = program_subscribe_result { panic!("programSubscribe failed: {error}"); } let account_subscribe_result = client.account_subscribe_raw("HybridPool111".to_string(), None).await; if let Err(error) = account_subscribe_result { panic!("accountSubscribe failed: {error}"); } tokio::time::sleep(std::time::Duration::from_millis(300)).await; let observations_result = crate::query_onchain_observations_list_recent(database.as_ref(), 20).await; let observations = match observations_result { Ok(observations) => observations, Err(error) => panic!("list_recent_onchain_observations failed: {error}"), }; let mut kinds = std::collections::BTreeSet::new(); for observation in &observations { kinds.insert(observation.observation_kind.clone()); } assert!(kinds.contains("ws.hybrid.logs_notification")); assert!(kinds.contains("ws.hybrid.program_notification")); assert!(kinds.contains("ws.hybrid.account_notification")); let stop_result = manager.stop_all().await; if let Err(error) = stop_result { panic!("stop_all failed: {error}"); } let detach_result = manager.detach_hybrid_observation_relay().await; if let Err(error) = detach_result { panic!("detach_hybrid_observation_relay failed: {error}"); } server.shutdown().await; } }