// file: kb_app/src/demo_ws_manager.rs //! Demo WebSocket manager window commands and runtime state. //! //! This module provides a lightweight test bench for `kb_lib::WsManager`. use tauri::Emitter; use tauri::Manager; /// Static endpoint summary enriched with current manager state. #[derive(Clone, Debug, serde::Serialize)] #[serde(rename_all = "camelCase")] pub(crate) struct KbDemoWsManagerEndpointSummary { name: std::string::String, resolved_url: std::string::String, provider: std::string::String, roles: std::vec::Vec, connection_state: std::string::String, active_subscription_count: usize, } /// Global demo manager snapshot payload. #[derive(Clone, Debug, serde::Serialize)] #[serde(rename_all = "camelCase")] pub(crate) struct KbDemoWsManagerSnapshotPayload { endpoint_count: usize, started_count: usize, endpoints: std::vec::Vec, } /// Runtime state for the demo WebSocket manager window. #[derive(Debug)] pub(crate) struct KbDemoWsManagerRuntimeState { relay_task: std::option::Option>, } impl KbDemoWsManagerRuntimeState { /// Creates a new empty runtime state. pub(crate) fn new() -> Self { Self { relay_task: None } } } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] struct DemoWsManagerActionResult { action: std::string::String, target: std::string::String, matched_count: usize, changed_count: usize, unchanged_count: usize, } /// Shows and focuses the preconfigured `demo_ws_manager` window. #[tauri::command] pub(crate) async fn open_demo_ws_manager_window( app_handle: tauri::AppHandle, state: tauri::State<'_, crate::KbAppState>, ) -> Result<(), std::string::String> { kb_ensure_demo_ws_manager_relay(&app_handle, &state).await; let existing_window_option = app_handle.get_webview_window("demo_ws_manager"); let demo_window = match existing_window_option { Some(demo_window) => demo_window, None => { let builder = tauri::WebviewWindowBuilder::new( &app_handle, "demo_ws_manager", tauri::WebviewUrl::App("demo_ws_manager.html".into()), ) .title("Demo Ws Manager") .inner_size(1280.0, 800.0) .min_inner_size(900.0, 620.0) .center() .visible(true) .transparent(false) .decorations(true); let build_result = builder.build(); match build_result { Ok(window) => window, Err(error) => { return Err(format!("cannot create demo_ws_manager window: {error:?}")); } } } }; let show_result = demo_window.show(); if let Err(error) = show_result { return Err(format!("cannot show demo_ws_manager window: {error:?}")); } let focus_result = demo_window.set_focus(); if let Err(error) = focus_result { return Err(format!("cannot focus demo_ws_manager window: {error:?}")); } kb_emit_demo_ws_manager_log(&app_handle, "[ui] demo_ws_manager window loaded"); kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await; Ok(()) } /// Returns the current manager snapshot. #[tauri::command] pub(crate) async fn demo_ws_manager_get_snapshot( state: tauri::State<'_, crate::KbAppState>, ) -> Result { kb_build_demo_ws_manager_snapshot(&state).await } /// Returns the distinct configured roles for enabled websocket endpoints. #[tauri::command] pub(crate) async fn demo_ws_manager_list_roles( state: tauri::State<'_, crate::KbAppState>, ) -> Result, std::string::String> { let mut roles = std::collections::BTreeSet::new(); for endpoint in &state.config.solana.ws_endpoints { if !endpoint.enabled { continue; } for role in &endpoint.roles { roles.insert(role.clone()); } } Ok(roles.into_iter().collect()) } /// Starts all managed websocket endpoints. #[tauri::command] pub(crate) async fn demo_ws_manager_start_all( app_handle: tauri::AppHandle, state: tauri::State<'_, crate::KbAppState>, ) -> Result { kb_ensure_demo_ws_manager_relay(&app_handle, &state).await; let matched_count = state.ws_manager.endpoint_names().await.len(); let start_result = state.ws_manager.start_all().await; let changed_count = match start_result { Ok(changed_count) => changed_count, Err(error) => return Err(error.to_string()), }; let action_result = kb_build_action_result("start", "all", matched_count, changed_count); kb_emit_demo_ws_manager_log( &app_handle, kb_format_action_result_for_log(&action_result).as_str(), ); kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await; kb_build_demo_ws_manager_snapshot(&state).await } /// Stops all managed websocket endpoints. #[tauri::command] pub(crate) async fn demo_ws_manager_stop_all( app_handle: tauri::AppHandle, state: tauri::State<'_, crate::KbAppState>, ) -> Result { let matched_count = state.ws_manager.endpoint_names().await.len(); let stop_result = state.ws_manager.stop_all().await; let changed_count = match stop_result { Ok(changed_count) => changed_count, Err(error) => return Err(error.to_string()), }; let action_result = kb_build_action_result("stop", "all", matched_count, changed_count); kb_emit_demo_ws_manager_log( &app_handle, kb_format_action_result_for_log(&action_result).as_str(), ); kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await; kb_build_demo_ws_manager_snapshot(&state).await } /// Starts all managed websocket endpoints having the selected role. #[tauri::command] pub(crate) async fn demo_ws_manager_start_role( app_handle: tauri::AppHandle, state: tauri::State<'_, crate::KbAppState>, role: std::string::String, ) -> Result { kb_ensure_demo_ws_manager_relay(&app_handle, &state).await; let matched_count = state .ws_manager .endpoint_names_for_role(role.as_str()) .await .len(); let start_result = state.ws_manager.start_role(role.as_str()).await; let changed_count = match start_result { Ok(changed_count) => changed_count, Err(error) => return Err(error.to_string()), }; let action_result = kb_build_action_result("start", role.as_str(), matched_count, changed_count); kb_emit_demo_ws_manager_log( &app_handle, kb_format_action_result_for_log(&action_result).as_str(), ); kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await; kb_build_demo_ws_manager_snapshot(&state).await } /// Stops all managed websocket endpoints having the selected role. #[tauri::command] pub(crate) async fn demo_ws_manager_stop_role( app_handle: tauri::AppHandle, state: tauri::State<'_, crate::KbAppState>, role: std::string::String, ) -> Result { let matched_count = state .ws_manager .endpoint_names_for_role(role.as_str()) .await .len(); let stop_result = state.ws_manager.stop_role(role.as_str()).await; let changed_count = match stop_result { Ok(changed_count) => changed_count, Err(error) => return Err(error.to_string()), }; let action_result = kb_build_action_result("stop", role.as_str(), matched_count, changed_count); kb_emit_demo_ws_manager_log( &app_handle, kb_format_action_result_for_log(&action_result).as_str(), ); kb_emit_demo_ws_manager_snapshot(&app_handle, &state).await; kb_build_demo_ws_manager_snapshot(&state).await } async fn kb_build_demo_ws_manager_snapshot( state: &tauri::State<'_, crate::KbAppState>, ) -> Result { let snapshot_result = state.ws_manager.snapshot().await; let snapshot = match snapshot_result { Ok(snapshot) => snapshot, Err(error) => return Err(error.to_string()), }; let mut endpoints = std::vec::Vec::new(); for managed_endpoint in snapshot.endpoints { let config_endpoint_option = state .config .find_ws_endpoint(&managed_endpoint.endpoint_name); let config_endpoint = match config_endpoint_option { Some(config_endpoint) => config_endpoint, None => { return Err(format!( "managed websocket endpoint '{}' is missing from config", managed_endpoint.endpoint_name )); } }; endpoints.push(KbDemoWsManagerEndpointSummary { name: managed_endpoint.endpoint_name, resolved_url: managed_endpoint.resolved_url, provider: managed_endpoint.provider, roles: config_endpoint.roles.clone(), connection_state: kb_connection_state_to_string(managed_endpoint.state), active_subscription_count: managed_endpoint.active_subscription_count, }); } Ok(KbDemoWsManagerSnapshotPayload { endpoint_count: snapshot.endpoint_count, started_count: snapshot.started_count, endpoints, }) } async fn kb_emit_demo_ws_manager_snapshot( app_handle: &tauri::AppHandle, state: &tauri::State<'_, crate::KbAppState>, ) { let snapshot_result = kb_build_demo_ws_manager_snapshot(state).await; let snapshot = match snapshot_result { Ok(snapshot) => snapshot, Err(error) => { kb_emit_demo_ws_manager_log(app_handle, &format!("[ui] snapshot error: {error}")); return; } }; let emit_result = app_handle.emit("kb-demo-ws-manager-snapshot", snapshot); if let Err(error) = emit_result { tracing::error!("error emitting demo_ws_manager snapshot event: {error:?}"); } } async fn kb_ensure_demo_ws_manager_relay( app_handle: &tauri::AppHandle, state: &tauri::State<'_, crate::KbAppState>, ) { let mut runtime_guard = state.demo_ws_manager_runtime.lock().await; if runtime_guard.relay_task.is_some() { return; } let mut receiver = state.ws_manager.subscribe_events(); let relay_app_handle = app_handle.clone(); let relay_state = state.demo_ws_manager_runtime.clone(); let relay_task = tauri::async_runtime::spawn(async move { loop { let recv_result = receiver.recv().await; match recv_result { Ok(event) => { let line = kb_format_ws_event(&event); kb_emit_demo_ws_manager_log(&relay_app_handle, line.as_str()); } Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { kb_emit_demo_ws_manager_log( &relay_app_handle, &format!( "[manager] event receiver lagged and skipped {} message(s)", skipped ), ); } Err(tokio::sync::broadcast::error::RecvError::Closed) => { break; } } } let mut runtime_guard = relay_state.lock().await; runtime_guard.relay_task = None; }); runtime_guard.relay_task = Some(relay_task); } fn kb_emit_demo_ws_manager_log(app_handle: &tauri::AppHandle, message: &str) { let emit_result = app_handle.emit("kb-demo-ws-manager-log", message.to_string()); if let Err(error) = emit_result { tracing::error!("error emitting demo_ws_manager log event: {error:?}"); } } fn kb_connection_state_to_string(state: kb_lib::KbConnectionState) -> std::string::String { match state { kb_lib::KbConnectionState::Disconnected => "Disconnected".to_string(), kb_lib::KbConnectionState::Connecting => "Connecting".to_string(), kb_lib::KbConnectionState::Connected => "Connected".to_string(), kb_lib::KbConnectionState::Disconnecting => "Disconnecting".to_string(), } } fn kb_format_ws_event(event: &kb_lib::WsEvent) -> std::string::String { match event { kb_lib::WsEvent::Connected { endpoint_name, endpoint_url, } => { format!("[ws:{endpoint_name}] connected to {endpoint_url}") } kb_lib::WsEvent::TextMessage { endpoint_name, text, } => { format!("[ws:{endpoint_name}] text: {text}") } kb_lib::WsEvent::JsonRpcMessage { endpoint_name, message, } => match message { kb_lib::KbJsonRpcWsIncomingMessage::SuccessResponse(response) => { format!( "[ws:{endpoint_name}] json-rpc success id={} result={}", response.id, response.result ) } kb_lib::KbJsonRpcWsIncomingMessage::ErrorResponse(response) => { format!( "[ws:{endpoint_name}] json-rpc error id={} code={} message={}", response.id, response.error.code, response.error.message ) } kb_lib::KbJsonRpcWsIncomingMessage::Notification(notification) => { format!( "[ws:{endpoint_name}] json-rpc notification method={} subscription={} result={}", notification.method, notification.params.subscription, notification.params.result ) } }, kb_lib::WsEvent::JsonRpcParseError { endpoint_name, text, error, } => { format!( "[ws:{endpoint_name}] json-rpc parse error: {} | raw={}", error, text ) } kb_lib::WsEvent::SubscriptionRegistered { endpoint_name, subscription, } => { format!( "[ws:{endpoint_name}] subscription registered subscribe_method={} unsubscribe_method={} notification_method={} request_id={} subscription_id={}", subscription.subscribe_method, subscription.unsubscribe_method, subscription.notification_method, subscription.request_id, subscription.subscription_id ) } kb_lib::WsEvent::SubscriptionNotification { endpoint_name, subscription, notification, method_matches_registry, } => { format!( "[ws:{endpoint_name}] tracked notification subscription_id={} method={} expected={} matches={} result={}", subscription.subscription_id, notification.method, subscription.notification_method, method_matches_registry, notification.params.result ) } kb_lib::WsEvent::JsonRpcNotificationWithoutSubscription { endpoint_name, notification, } => { format!( "[ws:{endpoint_name}] untracked notification method={} subscription={} result={}", notification.method, notification.params.subscription, notification.params.result ) } kb_lib::WsEvent::SubscriptionUnregistered { endpoint_name, subscription_id, unsubscribe_method, was_active, } => { format!( "[ws:{endpoint_name}] subscription unregistered subscription_id={} unsubscribe_method={} was_active={}", subscription_id, unsubscribe_method, was_active ) } kb_lib::WsEvent::BinaryMessage { endpoint_name, data, } => { format!("[{endpoint_name}] binary ({} bytes)", data.len()) } kb_lib::WsEvent::Ping { endpoint_name, data, } => { format!("[{endpoint_name}] ping ({} bytes)", data.len()) } kb_lib::WsEvent::Pong { endpoint_name, data, } => { format!("[{endpoint_name}] pong ({} bytes)", data.len()) } kb_lib::WsEvent::CloseReceived { endpoint_name, code, reason, } => { format!( "[ws:{endpoint_name}] close received code={:?} reason={:?}", code, reason ) } kb_lib::WsEvent::Disconnected { endpoint_name } => { format!("[ws:{endpoint_name}] disconnected") } kb_lib::WsEvent::Error { endpoint_name, error, } => { format!("[ws:{endpoint_name}] error: {error}") } } } fn kb_build_action_result( action: &str, target: &str, matched_count: usize, changed_count: usize, ) -> DemoWsManagerActionResult { let unchanged_count = matched_count.saturating_sub(changed_count); DemoWsManagerActionResult { action: action.to_string(), target: target.to_string(), matched_count, changed_count, unchanged_count, } } fn kb_action_past_tense(action: &str) -> &'static str { match action { "start" => "started", "stop" => "stopped", _ => "processed", } } fn kb_format_action_result_for_log(result: &DemoWsManagerActionResult) -> std::string::String { let is_all = result.target == "all"; let past = kb_action_past_tense(result.action.as_str()); if result.matched_count == 0 { if is_all { return "[ui] no managed websocket endpoint is configured".to_string(); } return format!( "[ui] no managed websocket endpoint matches role '{}'", result.target ); } if result.changed_count == 0 { if is_all { return format!( "[ui] all managed websocket endpoints were already {}", if result.action == "start" { "started" } else { "stopped" } ); } return format!( "[ui] role '{}' was already {} on {} endpoint(s)", result.target, if result.action == "start" { "started" } else { "stopped" }, result.unchanged_count ); } if result.unchanged_count == 0 { if is_all { return format!( "[ui] {}ed {} managed websocket endpoint(s)", past, result.changed_count ); } return format!( "[ui] {}ed role '{}' on {} endpoint(s)", past, result.target, result.changed_count ); } if is_all { return format!( "[ui] {}ed {} managed websocket endpoint(s); {} already {}", past, result.changed_count, result.unchanged_count, if result.action == "start" { "started" } else { "stopped" } ); } format!( "[ui] {}ed role '{}' on {} endpoint(s); {} already {}", result.action, result.target, result.changed_count, result.unchanged_count, if result.action == "start" { "started" } else { "stopped" } ) }