diff --git a/CHANGELOG.md b/CHANGELOG.md index eb9d37c..f0e411d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,4 +3,5 @@ 0.0.1 - initial skel 0.0.2 - Socle conforme 0.1.0 - Transport WebSocket générique -0.1.1 = Intégration Tauri minimale du WsClient +0.1.1 - Intégration Tauri minimale du WsClient +0.2.0 - Couche JSON-RPC WS Solana diff --git a/Cargo.toml b/Cargo.toml index 21e1596..9449e77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.1.1" +version = "0.2.0" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" @@ -44,7 +44,7 @@ spl-memo-interface = { version = "^2.0", features = [] } spl-token-interface = { version = "^2.0", features = [] } spl-token-2022-interface = { version = "^2.1", features = [] } sqlx = { version = "^0.8", features = ["chrono", "uuid", "bigdecimal", "json", "sqlite", "runtime-tokio-rustls"] } -tauri = { version = "^2.10", features = ["default"] } +tauri = { version = "^2.10", features = ["default", "tray-icon"] } tauri-build = { version = "2", features = [] } tauri-plugin-tracing = { version = "^0.3", default-features = false, features = [] } tempfile = { version = "^3", features = [] } diff --git a/kb_app/package.json b/kb_app/package.json index 1945f9d..7107937 100644 --- a/kb_app/package.json +++ b/kb_app/package.json @@ -1,7 +1,7 @@ { "name": "kb-app", "private": true, - "version": "0.1.1", + "version": "0.2.0", "type": "module", "scripts": { "dev": "vite", diff --git a/kb_app/src/lib.rs b/kb_app/src/lib.rs index 7d00386..3f83883 100644 --- a/kb_app/src/lib.rs +++ b/kb_app/src/lib.rs @@ -330,38 +330,84 @@ fn kb_emit_app_log(app_handle: &tauri::AppHandle, message: &str) { } } -fn kb_format_ws_event(event: &kb_lib::WsEvent) -> std::string::String { +fn kb_format_ws_event( + event: &kb_lib::WsEvent, +) -> std::string::String { match event { kb_lib::WsEvent::Connected { endpoint_name, endpoint_url, } => { format!("[ws:{endpoint_name}] connected to {endpoint_url}") - } + }, kb_lib::WsEvent::TextMessage { endpoint_name, text, } => { format!("[ws:{endpoint_name}] text: {text}") - } + }, + kb_lib::WsEvent::JsonRpcMessage { + endpoint_name, + message, + } => { + match message { + kb_lib::KbJsonRpcWsIncomingMessage::SuccessResponse(response) => { + format!( + "[ws:{endpoint_name}] json-rpc success id={} result={}", + response.id, + response.result + ) + }, + kb_lib::KbJsonRpcWsIncomingMessage::ErrorResponse(response) => { + format!( + "[ws:{endpoint_name}] json-rpc error id={} code={} message={}", + response.id, + response.error.code, + response.error.message + ) + }, + kb_lib::KbJsonRpcWsIncomingMessage::Notification(notification) => { + format!( + "[ws:{endpoint_name}] json-rpc notification method={} subscription={} result={}", + notification.method, + notification.params.subscription, + notification.params.result + ) + }, + } + }, + kb_lib::WsEvent::JsonRpcParseError { + endpoint_name, + text, + error, + } => { + format!( + "[ws:{endpoint_name}] json-rpc parse error: {} | raw={}", + error, + text + ) + }, kb_lib::WsEvent::BinaryMessage { endpoint_name, data, } => { - format!("[ws:{endpoint_name}] binary message ({} bytes)", data.len()) - } + format!( + "[ws:{endpoint_name}] binary message ({} bytes)", + data.len() + ) + }, kb_lib::WsEvent::Ping { endpoint_name, data, } => { format!("[ws:{endpoint_name}] ping ({} bytes)", data.len()) - } + }, kb_lib::WsEvent::Pong { endpoint_name, data, } => { format!("[ws:{endpoint_name}] pong ({} bytes)", data.len()) - } + }, kb_lib::WsEvent::CloseReceived { endpoint_name, code, @@ -369,18 +415,21 @@ fn kb_format_ws_event(event: &kb_lib::WsEvent) -> std::string::String { } => { format!( "[ws:{endpoint_name}] close received code={:?} reason={:?}", - code, reason + code, + reason ) - } - kb_lib::WsEvent::Disconnected { endpoint_name } => { + }, + kb_lib::WsEvent::Disconnected { + endpoint_name, + } => { format!("[ws:{endpoint_name}] disconnected") - } + }, kb_lib::WsEvent::Error { endpoint_name, error, } => { format!("[ws:{endpoint_name}] error: {error}") - } + }, } } diff --git a/kb_app/tauri.conf.json b/kb_app/tauri.conf.json index 1003e57..3ad7109 100644 --- a/kb_app/tauri.conf.json +++ b/kb_app/tauri.conf.json @@ -1,7 +1,7 @@ { "$schema": "https://schema.tauri.app/config/2", "productName": "kb-bapp", - "version": "0.1.1", + "version": "0.2.0", "identifier": "com.sasedev.kb-app", "build": { "beforeDevCommand": "npm run dev", diff --git a/kb_app/vite.config.ts b/kb_app/vite.config.ts index 41c3409..64c717d 100644 --- a/kb_app/vite.config.ts +++ b/kb_app/vite.config.ts @@ -56,7 +56,12 @@ export default defineConfig(async () => ({ css: { preprocessorOptions: { scss: { - api: 'modern-compiler', + quietDeps: true, + //silenceDeprecations: ["import", "color-functions", "global-builtin"] as const, + silenceDeprecations: ["import", "color-functions", "global-builtin",], + verbose: false, + //api: 'modern', + //api: 'modern-compiler', importers: [new NodePackageImporter()], } } diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index 1fb2a6e..6b3a341 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -3,9 +3,7 @@ //! Core library of the `khadhroony-bobobot` workspace. //! //! This crate contains the reusable backend logic shared by the desktop -//! application and future clients. The first milestone focuses on a -//! conformant project skeleton with configuration loading, tracing setup, -//! shared constants, and transport client placeholders. +//! application and future clients. #![deny(unreachable_pub)] #![warn(missing_docs)] @@ -14,6 +12,7 @@ mod config; mod constants; mod error; mod http_client; +mod rpc_ws; mod tracing; mod types; mod ws_client; @@ -27,6 +26,16 @@ pub use crate::config::KbSolanaConfig; pub use crate::config::KbWsEndpointConfig; pub use crate::constants::*; pub use crate::error::KbError; +pub use crate::rpc_ws::KbJsonRpcWsErrorObject; +pub use crate::rpc_ws::KbJsonRpcWsErrorResponse; +pub use crate::rpc_ws::KbJsonRpcWsIncomingMessage; +pub use crate::rpc_ws::KbJsonRpcWsNotification; +pub use crate::rpc_ws::KbJsonRpcWsNotificationParams; +pub use crate::rpc_ws::KbJsonRpcWsRequest; +pub use crate::rpc_ws::KbJsonRpcWsSuccessResponse; +pub use crate::rpc_ws::kb_is_probable_json_rpc_object_text; +pub use crate::rpc_ws::parse_kb_json_rpc_ws_incoming_text; +pub use crate::rpc_ws::parse_kb_json_rpc_ws_incoming_value; pub use crate::http_client::HttpClient; pub use crate::tracing::KbTracingGuard; pub use crate::tracing::init_tracing; diff --git a/kb_lib/src/rpc_ws.rs b/kb_lib/src/rpc_ws.rs new file mode 100644 index 0000000..c4ec53b --- /dev/null +++ b/kb_lib/src/rpc_ws.rs @@ -0,0 +1,340 @@ +// file: kb_lib/src/rpc_ws.rs + +//! Generic JSON-RPC 2.0 WebSocket helpers. +//! +//! This module provides generic JSON-RPC request and incoming-message parsing +//! helpers for WebSocket-based Solana RPC communication. +//! +//! At this stage, the top-level envelopes are typed while the method-specific +//! payloads remain as `serde_json::Value`. Later versions can progressively +//! replace selected payloads with official Solana RPC client types. + +/// Generic JSON-RPC 2.0 request sent over WebSocket. +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct KbJsonRpcWsRequest { + /// JSON-RPC version, expected to be `"2.0"`. + pub jsonrpc: std::string::String, + /// Client request identifier. + pub id: serde_json::Value, + /// RPC method name. + pub method: std::string::String, + /// Ordered method parameters. + pub params: std::vec::Vec, +} + +impl KbJsonRpcWsRequest { + /// Creates a new JSON-RPC request with a numeric identifier. + pub fn new_with_u64_id( + id: u64, + method: std::string::String, + params: std::vec::Vec, + ) -> Self { + Self { + jsonrpc: "2.0".to_string(), + id: serde_json::Value::from(id), + method, + params, + } + } + + /// Converts the request into a JSON value. + pub fn to_value(&self) -> Result { + let value_result = serde_json::to_value(self); + match value_result { + Ok(value) => Ok(value), + Err(error) => Err(crate::KbError::Json(format!( + "cannot serialize websocket json-rpc request '{}': {error}", + self.method + ))), + } + } + + /// Serializes the request into a compact JSON string. + pub fn to_json_string(&self) -> Result { + let text_result = serde_json::to_string(self); + match text_result { + Ok(text) => Ok(text), + Err(error) => Err(crate::KbError::Json(format!( + "cannot serialize websocket json-rpc request '{}': {error}", + self.method + ))), + } + } +} + +/// JSON-RPC 2.0 success response. +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct KbJsonRpcWsSuccessResponse { + /// JSON-RPC version, expected to be `"2.0"`. + pub jsonrpc: std::string::String, + /// Result payload. + pub result: serde_json::Value, + /// Request identifier echoed by the server. + pub id: serde_json::Value, +} + +/// JSON-RPC 2.0 error object. +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct KbJsonRpcWsErrorObject { + /// Numeric JSON-RPC error code. + pub code: i64, + /// Human-readable error message. + pub message: std::string::String, + /// Optional server-provided structured payload. + pub data: std::option::Option, +} + +/// JSON-RPC 2.0 error response. +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct KbJsonRpcWsErrorResponse { + /// JSON-RPC version, expected to be `"2.0"`. + pub jsonrpc: std::string::String, + /// Error payload. + pub error: KbJsonRpcWsErrorObject, + /// Request identifier echoed by the server. + pub id: serde_json::Value, +} + +/// JSON-RPC 2.0 notification parameter object. +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct KbJsonRpcWsNotificationParams { + /// Method-specific result payload. + pub result: serde_json::Value, + /// Active subscription identifier. + pub subscription: u64, +} + +/// JSON-RPC 2.0 notification message. +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct KbJsonRpcWsNotification { + /// JSON-RPC version, expected to be `"2.0"`. + pub jsonrpc: std::string::String, + /// Notification method name such as `slotNotification`. + pub method: std::string::String, + /// Notification payload. + pub params: KbJsonRpcWsNotificationParams, +} + +/// Parsed incoming JSON-RPC WebSocket message. +#[derive(Clone, Debug, PartialEq)] +pub enum KbJsonRpcWsIncomingMessage { + /// JSON-RPC success response. + SuccessResponse(KbJsonRpcWsSuccessResponse), + /// JSON-RPC error response. + ErrorResponse(KbJsonRpcWsErrorResponse), + /// JSON-RPC notification. + Notification(KbJsonRpcWsNotification), +} + +impl KbJsonRpcWsIncomingMessage { + /// Returns a short human-readable kind label. + pub fn kind_name(&self) -> &'static str { + match self { + Self::SuccessResponse(_) => "success_response", + Self::ErrorResponse(_) => "error_response", + Self::Notification(_) => "notification", + } + } +} + +/// Returns `true` when the text looks like a JSON object payload. +/// +/// This is intentionally conservative and only checks for a leading `{` after +/// trimming left-side whitespace. +pub fn kb_is_probable_json_rpc_object_text(text: &str) -> bool { + let trimmed = text.trim_start(); + trimmed.starts_with('{') +} + +/// Parses a raw text message into a JSON-RPC incoming message. +/// +/// This parser accepts only server-originating incoming message shapes: +/// success responses, error responses, and notifications. +pub fn parse_kb_json_rpc_ws_incoming_text( + text: &str, +) -> Result { + let value_result = serde_json::from_str::(text); + let value = match value_result { + Ok(value) => value, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot parse websocket json-rpc text: {error}" + ))); + } + }; + + parse_kb_json_rpc_ws_incoming_value(&value) +} + +/// Parses a JSON value into a JSON-RPC incoming message. +/// +/// This parser accepts only server-originating incoming message shapes: +/// success responses, error responses, and notifications. +pub fn parse_kb_json_rpc_ws_incoming_value( + value: &serde_json::Value, +) -> Result { + let object = match value.as_object() { + Some(object) => object, + None => { + return Err(crate::KbError::Json( + "json-rpc websocket payload must be a JSON object".to_string(), + )); + } + }; + let jsonrpc_value_option = object.get("jsonrpc"); + let jsonrpc_value = match jsonrpc_value_option { + Some(jsonrpc_value) => jsonrpc_value, + None => { + return Err(crate::KbError::Json( + "json-rpc websocket payload is missing 'jsonrpc'".to_string(), + )); + } + }; + let jsonrpc_string_option = jsonrpc_value.as_str(); + let jsonrpc_string = match jsonrpc_string_option { + Some(jsonrpc_string) => jsonrpc_string, + None => { + return Err(crate::KbError::Json( + "json-rpc websocket field 'jsonrpc' must be a string".to_string(), + )); + } + }; + if jsonrpc_string != "2.0" { + return Err(crate::KbError::Json(format!( + "unsupported json-rpc version '{}'", + jsonrpc_string + ))); + } + let has_method = object.contains_key("method"); + let has_params = object.contains_key("params"); + let has_result = object.contains_key("result"); + let has_error = object.contains_key("error"); + let has_id = object.contains_key("id"); + if has_method && has_params && !has_id { + let notification_result = serde_json::from_value::(value.clone()); + let notification = match notification_result { + Ok(notification) => notification, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot parse websocket json-rpc notification: {error}" + ))); + } + }; + return Ok(KbJsonRpcWsIncomingMessage::Notification(notification)); + } + if has_id && has_result && !has_error { + let response_result = serde_json::from_value::(value.clone()); + let response = match response_result { + Ok(response) => response, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot parse websocket json-rpc success response: {error}" + ))); + } + }; + return Ok(KbJsonRpcWsIncomingMessage::SuccessResponse(response)); + } + if has_id && has_error && !has_result { + let response_result = serde_json::from_value::(value.clone()); + let response = match response_result { + Ok(response) => response, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot parse websocket json-rpc error response: {error}" + ))); + } + }; + return Ok(KbJsonRpcWsIncomingMessage::ErrorResponse(response)); + } + Err(crate::KbError::Json( + "unsupported websocket json-rpc message shape".to_string(), + )) +} + +#[cfg(test)] +mod tests { + #[test] + fn request_serialization_contains_expected_fields() { + let request = crate::KbJsonRpcWsRequest::new_with_u64_id( + 7, + "slotSubscribe".to_string(), + std::vec::Vec::new(), + ); + let value = request + .to_value() + .expect("request value serialization must succeed"); + assert_eq!( + value["jsonrpc"], + serde_json::Value::String("2.0".to_string()) + ); + assert_eq!(value["id"], serde_json::Value::from(7u64)); + assert_eq!( + value["method"], + serde_json::Value::String("slotSubscribe".to_string()) + ); + assert_eq!(value["params"], serde_json::Value::Array(vec![])); + } + + #[test] + fn parse_success_response_works() { + let text = r#"{"jsonrpc":"2.0","result":42,"id":3}"#; + let parsed = crate::parse_kb_json_rpc_ws_incoming_text(text).expect("parse must succeed"); + match parsed { + crate::KbJsonRpcWsIncomingMessage::SuccessResponse(response) => { + assert_eq!(response.jsonrpc, "2.0"); + assert_eq!(response.result, serde_json::Value::from(42u64)); + assert_eq!(response.id, serde_json::Value::from(3u64)); + } + other => { + panic!("unexpected parsed message: {other:?}"); + } + } + } + + #[test] + fn parse_error_response_works() { + let text = + r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":9}"#; + let parsed = crate::parse_kb_json_rpc_ws_incoming_text(text).expect("parse must succeed"); + match parsed { + crate::KbJsonRpcWsIncomingMessage::ErrorResponse(response) => { + assert_eq!(response.jsonrpc, "2.0"); + assert_eq!(response.error.code, -32601); + assert_eq!(response.error.message, "Method not found"); + assert_eq!(response.error.data, None); + assert_eq!(response.id, serde_json::Value::from(9u64)); + } + other => { + panic!("unexpected parsed message: {other:?}"); + } + } + } + + #[test] + fn parse_notification_works() { + let text = r#"{"jsonrpc":"2.0","method":"slotNotification","params":{"result":{"parent":1,"root":2,"slot":3},"subscription":17}}"#; + let parsed = crate::parse_kb_json_rpc_ws_incoming_text(text).expect("parse must succeed"); + match parsed { + crate::KbJsonRpcWsIncomingMessage::Notification(notification) => { + assert_eq!(notification.jsonrpc, "2.0"); + assert_eq!(notification.method, "slotNotification"); + assert_eq!(notification.params.subscription, 17); + assert_eq!( + notification.params.result["slot"], + serde_json::Value::from(3u64) + ); + } + other => { + panic!("unexpected parsed message: {other:?}"); + } + } + } + + #[test] + fn probable_json_rpc_object_text_detects_object() { + assert!(crate::kb_is_probable_json_rpc_object_text( + " {\"jsonrpc\":\"2.0\"}" + )); + assert!(!crate::kb_is_probable_json_rpc_object_text("hello")); + } +} diff --git a/kb_lib/src/ws_client.rs b/kb_lib/src/ws_client.rs index c1087bc..ec29435 100644 --- a/kb_lib/src/ws_client.rs +++ b/kb_lib/src/ws_client.rs @@ -2,20 +2,9 @@ //! Generic asynchronous WebSocket transport client. //! -//! Version `0.1.x` provides a reusable transport-level client built on top of -//! `tokio-tungstenite`. -//! -//! Scope of this version: -//! - explicit connect / disconnect -//! - separate read and write tasks -//! - bounded outgoing channel -//! - broadcast event stream -//! - incremental request identifier generator -//! - graceful close with timeout and task cancellation fallback -//! -//! JSON-RPC request / response matching, subscribe / unsubscribe tracking, -//! and Solana-specific notification routing are intentionally left for later -//! versions. +//! Version `0.2.x` keeps the transport layer introduced in `0.1.x` and adds +//! generic JSON-RPC 2.0 request helpers plus incoming JSON-RPC parsing for +//! text messages received from the server. use futures_util::SinkExt; use futures_util::StreamExt; @@ -36,7 +25,7 @@ pub enum WsOutgoingMessage { } /// Incoming WebSocket transport event emitted by [`crate::WsClient`]. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq)] pub enum WsEvent { /// Connection established successfully. Connected { @@ -52,6 +41,22 @@ pub enum WsEvent { /// Received text payload. text: std::string::String, }, + /// Parsed JSON-RPC text message received. + JsonRpcMessage { + /// Stable endpoint name from configuration. + endpoint_name: std::string::String, + /// Parsed JSON-RPC message. + message: crate::KbJsonRpcWsIncomingMessage, + }, + /// JSON-RPC parsing failed for a text message that looked like JSON. + JsonRpcParseError { + /// Stable endpoint name from configuration. + endpoint_name: std::string::String, + /// Original text payload. + text: std::string::String, + /// Parse error. + error: crate::KbError, + }, /// Binary message received. BinaryMessage { /// Stable endpoint name from configuration. @@ -254,7 +259,6 @@ impl WsClient { return Err(kb_error); } }; - let (write_half, read_half) = ws_stream.split(); let (writer_tx, writer_rx) = tokio::sync::mpsc::channel(self.endpoint.write_channel_capacity); @@ -389,6 +393,36 @@ impl WsClient { self.send_text(text).await } + /// Sends a prebuilt JSON-RPC request object. + pub async fn send_json_rpc_request_object( + &self, + request: &crate::KbJsonRpcWsRequest, + ) -> Result<(), crate::KbError> { + let value_result = request.to_value(); + let value = match value_result { + Ok(value) => value, + Err(error) => return Err(error), + }; + self.send_json_value(&value).await + } + + /// Builds and sends a JSON-RPC request with a generated numeric identifier. + /// + /// Returns the generated request identifier. + pub async fn send_json_rpc_request( + &self, + method: std::string::String, + params: std::vec::Vec, + ) -> Result { + let request_id = self.next_request_id(); + let request = crate::KbJsonRpcWsRequest::new_with_u64_id(request_id, method, params); + let send_result = self.send_json_rpc_request_object(&request).await; + match send_result { + Ok(()) => Ok(request_id), + Err(error) => Err(error), + } + } + /// Initiates the close handshake. pub async fn send_close(&self) -> Result<(), crate::KbError> { self.send_message(WsOutgoingMessage::Close).await @@ -582,10 +616,29 @@ impl WsClient { }; match message { tokio_tungstenite::tungstenite::Message::Text(text) => { + let text_string = text.to_string(); self.emit_event(WsEvent::TextMessage { endpoint_name: self.endpoint.name.clone(), - text: text.to_string(), + text: text_string.clone(), }); + if crate::kb_is_probable_json_rpc_object_text(&text_string) { + let parse_result = crate::parse_kb_json_rpc_ws_incoming_text(&text_string); + match parse_result { + Ok(parsed_message) => { + self.emit_event(WsEvent::JsonRpcMessage { + endpoint_name: self.endpoint.name.clone(), + message: parsed_message, + }); + } + Err(error) => { + self.emit_event(WsEvent::JsonRpcParseError { + endpoint_name: self.endpoint.name.clone(), + text: text_string.clone(), + error, + }); + } + } + } } tokio_tungstenite::tungstenite::Message::Binary(data) => { self.emit_event(WsEvent::BinaryMessage { @@ -846,7 +899,7 @@ mod tests { } impl TestWsServer { - async fn spawn() -> Self { + async fn spawn_echo_server() -> Self { let bind_result = tokio::net::TcpListener::bind("127.0.0.1:0").await; let listener = bind_result.expect("listener bind must succeed"); let local_addr = listener.local_addr().expect("local addr must be available"); @@ -912,6 +965,102 @@ mod tests { } } + async fn spawn_json_rpc_server() -> Self { + let bind_result = tokio::net::TcpListener::bind("127.0.0.1:0").await; + let listener = bind_result.expect("listener bind must succeed"); + let local_addr = listener.local_addr().expect("local addr must be available"); + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut shutdown_rx => { + break; + }, + accept_result = listener.accept() => { + let (stream, _peer_addr) = accept_result.expect("accept must succeed"); + tokio::spawn(async move { + let accept_ws_result = tokio_tungstenite::accept_async(stream).await; + let mut ws_stream = accept_ws_result.expect("websocket accept must succeed"); + loop { + let message_option = ws_stream.next().await; + let message_result = match message_option { + Some(message_result) => message_result, + None => { + break; + }, + }; + let message = message_result.expect("message read must succeed"); + match message { + tokio_tungstenite::tungstenite::Message::Text(text) => { + let value: serde_json::Value = serde_json::from_str(text.as_ref()) + .expect("request json must parse"); + let method = value["method"].as_str().expect("method must be a string"); + let id = value["id"].clone(); + if method == "slotSubscribe" { + let response = serde_json::json!({ + "jsonrpc": "2.0", + "result": 77, + "id": id + }); + ws_stream.send( + tokio_tungstenite::tungstenite::Message::Text(response.to_string().into()) + ).await.expect("success response send must succeed"); + let notification = serde_json::json!({ + "jsonrpc": "2.0", + "method": "slotNotification", + "params": { + "result": { + "parent": 10, + "root": 11, + "slot": 12 + }, + "subscription": 77 + } + }); + ws_stream.send( + tokio_tungstenite::tungstenite::Message::Text(notification.to_string().into()) + ).await.expect("notification send must succeed"); + } else { + let response = serde_json::json!({ + "jsonrpc": "2.0", + "error": { + "code": -32601, + "message": "Method not found" + }, + "id": id + }); + ws_stream.send( + tokio_tungstenite::tungstenite::Message::Text(response.to_string().into()) + ).await.expect("error response send must succeed"); + } + }, + tokio_tungstenite::tungstenite::Message::Close(frame) => { + let _ = ws_stream.send( + tokio_tungstenite::tungstenite::Message::Close(frame) + ).await; + break; + }, + tokio_tungstenite::tungstenite::Message::Binary(_data) => {}, + tokio_tungstenite::tungstenite::Message::Ping(data) => { + ws_stream.send( + tokio_tungstenite::tungstenite::Message::Pong(data) + ).await.expect("pong reply must succeed"); + }, + tokio_tungstenite::tungstenite::Message::Pong(_data) => {}, + tokio_tungstenite::tungstenite::Message::Frame(_frame) => {}, + } + } + }); + }, + } + } + }); + Self { + url: format!("ws://{}", local_addr), + shutdown_tx: Some(shutdown_tx), + } + } + async fn shutdown(mut self) { if let Some(shutdown_tx) = self.shutdown_tx.take() { let _ = shutdown_tx.send(()); @@ -958,7 +1107,7 @@ mod tests { #[tokio::test] async fn connect_send_text_and_disconnect() { - let server = TestWsServer::spawn().await; + let server = TestWsServer::spawn_echo_server().await; let endpoint = make_ws_endpoint(server.url.clone()); let client = crate::WsClient::new(endpoint).expect("client creation must succeed"); let mut receiver = client.subscribe_events(); @@ -1013,7 +1162,7 @@ mod tests { #[tokio::test] async fn connect_twice_returns_invalid_state() { - let server = TestWsServer::spawn().await; + let server = TestWsServer::spawn_echo_server().await; let endpoint = make_ws_endpoint(server.url.clone()); let client = crate::WsClient::new(endpoint).expect("client creation must succeed"); client.connect().await.expect("first connect must succeed"); @@ -1034,7 +1183,7 @@ mod tests { #[tokio::test] async fn send_ping_receives_pong_event() { - let server = TestWsServer::spawn().await; + let server = TestWsServer::spawn_echo_server().await; let endpoint = make_ws_endpoint(server.url.clone()); let client = crate::WsClient::new(endpoint).expect("client creation must succeed"); let mut receiver = client.subscribe_events(); @@ -1060,4 +1209,106 @@ mod tests { client.disconnect().await.expect("disconnect must succeed"); server.shutdown().await; } + + #[tokio::test] + async fn send_json_rpc_request_emits_success_response_and_notification() { + let server = TestWsServer::spawn_json_rpc_server().await; + let endpoint = make_ws_endpoint(server.url.clone()); + let client = crate::WsClient::new(endpoint).expect("client creation must succeed"); + let mut receiver = client.subscribe_events(); + client.connect().await.expect("connect must succeed"); + let _ = recv_event(&mut receiver).await; + let request_id = client + .send_json_rpc_request("slotSubscribe".to_string(), std::vec::Vec::new()) + .await + .expect("json-rpc send must succeed"); + assert_eq!(request_id, 1); + let mut success_seen = false; + let mut notification_seen = false; + for _ in 0..6 { + let event = recv_event(&mut receiver).await; + match event { + crate::WsEvent::JsonRpcMessage { + endpoint_name, + message, + } => { + assert_eq!(endpoint_name, "test_ws"); + match message { + crate::KbJsonRpcWsIncomingMessage::SuccessResponse(response) => { + assert_eq!(response.id, serde_json::Value::from(1u64)); + assert_eq!(response.result, serde_json::Value::from(77u64)); + success_seen = true; + } + crate::KbJsonRpcWsIncomingMessage::Notification(notification) => { + assert_eq!(notification.method, "slotNotification"); + assert_eq!(notification.params.subscription, 77); + assert_eq!( + notification.params.result["slot"], + serde_json::Value::from(12u64) + ); + notification_seen = true; + } + crate::KbJsonRpcWsIncomingMessage::ErrorResponse(other) => { + panic!("unexpected error response: {other:?}"); + } + } + } + crate::WsEvent::TextMessage { .. } => {} + other => { + panic!("unexpected event: {other:?}"); + } + } + + if success_seen && notification_seen { + break; + } + } + assert!(success_seen, "json-rpc success response must be observed"); + assert!(notification_seen, "json-rpc notification must be observed"); + client.disconnect().await.expect("disconnect must succeed"); + server.shutdown().await; + } + + #[tokio::test] + async fn send_unknown_json_rpc_method_emits_error_response() { + let server = TestWsServer::spawn_json_rpc_server().await; + let endpoint = make_ws_endpoint(server.url.clone()); + let client = crate::WsClient::new(endpoint).expect("client creation must succeed"); + let mut receiver = client.subscribe_events(); + client.connect().await.expect("connect must succeed"); + let _ = recv_event(&mut receiver).await; + let request_id = client + .send_json_rpc_request("unknownMethod".to_string(), std::vec::Vec::new()) + .await + .expect("json-rpc send must succeed"); + assert_eq!(request_id, 1); + let mut error_seen = false; + for _ in 0..4 { + let event = recv_event(&mut receiver).await; + match event { + crate::WsEvent::JsonRpcMessage { message, .. } => match message { + crate::KbJsonRpcWsIncomingMessage::ErrorResponse(response) => { + assert_eq!(response.id, serde_json::Value::from(1u64)); + assert_eq!(response.error.code, -32601); + assert_eq!(response.error.message, "Method not found"); + error_seen = true; + break; + } + crate::KbJsonRpcWsIncomingMessage::SuccessResponse(other) => { + panic!("unexpected success response: {other:?}"); + } + crate::KbJsonRpcWsIncomingMessage::Notification(other) => { + panic!("unexpected notification: {other:?}"); + } + }, + crate::WsEvent::TextMessage { .. } => {} + other => { + panic!("unexpected event: {other:?}"); + } + } + } + assert!(error_seen, "json-rpc error response must be observed"); + client.disconnect().await.expect("disconnect must succeed"); + server.shutdown().await; + } }