From c2ec9b8b41c8b4a1d1efb1f52f65f0d23495d30c Mon Sep 17 00:00:00 2001 From: SinuS Von SifriduS Date: Sat, 18 Apr 2026 14:38:04 +0200 Subject: [PATCH] 0.4.4 --- Cargo.toml | 2 +- khbb_lib/src/lib.rs | 2 + khbb_lib/src/listener.rs | 299 +++++++++------- khbb_lib/src/solana_rpc_ws.rs | 623 +++++++++++++++++++++++----------- 4 files changed, 611 insertions(+), 315 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9ddbf64..02d1959 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.4.3" +version = "0.4.4" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot" diff --git a/khbb_lib/src/lib.rs b/khbb_lib/src/lib.rs index 2f2d771..793ac9d 100644 --- a/khbb_lib/src/lib.rs +++ b/khbb_lib/src/lib.rs @@ -63,3 +63,5 @@ pub use crate::solana_rpc_ws::KhbbWsJsonRpcResponseEnvelope; pub use crate::solana_rpc_ws::KhbbWsNotificationEnvelope; /// Notification params envelope used by Solana PubSub notifications. pub use crate::solana_rpc_ws::KhbbWsNotificationParams; +/// Classified incoming WebSocket message. +pub use crate::solana_rpc_ws::KhbbWsIncomingMessage; diff --git a/khbb_lib/src/listener.rs b/khbb_lib/src/listener.rs index 48a151e..6322522 100644 --- a/khbb_lib/src/listener.rs +++ b/khbb_lib/src/listener.rs @@ -27,6 +27,7 @@ pub async fn run_listener_runtime( let tick_duration = std::time::Duration::from_millis(config.listener_poll_interval_ms); let mut interval = tokio::time::interval(tick_duration); let mut tick_count: u64 = 0; + let mut final_status = std::string::String::from("stopped"); let http_client_config = crate::KhbbSolanaHttpRpcClientConfig { url: config.solana_http_rpc_url.clone() }; let http_client_result = crate::KhbbSolanaHttpRpcClient::new(http_client_config); @@ -45,7 +46,6 @@ pub async fn run_listener_runtime( return Err(error); }, }; - let mut ws_subscription_handles = std::vec::Vec::::new(); let ws_connect_result = ws_client.connect().await; match ws_connect_result { Ok(()) => { @@ -59,6 +59,7 @@ pub async fn run_listener_runtime( return Err(error); }, } + let mut ws_subscription_handles = std::vec::Vec::::new(); if config.enable_ws_slot_subscribe { let slot_subscribe_result = ws_client.slot_subscribe(1).await; let slot_subscribe_output = match slot_subscribe_result { @@ -234,7 +235,6 @@ pub async fn run_listener_runtime( program_request_id = program_request_id.saturating_add(1); } } - let mut final_status = std::string::String::from("stopped"); loop { tokio::select! { _ = interval.tick() => { @@ -253,7 +253,7 @@ pub async fn run_listener_runtime( "ok" }; let insert_result = crate::storage::insert_raw_http_rpc_message( - &pool, + pool, session.id, call_output.request_id as i64, &call_output.method, @@ -286,133 +286,161 @@ pub async fn run_listener_runtime( } let ws_read_timeout_result = tokio::time::timeout( std::time::Duration::from_millis(50), - ws_client.read_next_text_message(), + ws_client.read_next_incoming_message(), ) .await; match ws_read_timeout_result { Ok(read_result) => { match read_result { - Ok(Some(message_text)) => { + Ok(crate::KhbbWsIncomingMessage::Response { raw, id, .. }) => { let insert_ws_message_result = crate::storage::insert_raw_ws_message( pool, session.id, "incoming", - &message_text, + &raw, ) .await; match insert_ws_message_result { Ok(()) => { tracing::trace!( listener_session_id = session.id, - "raw websocket message stored" + response_id = id, + "raw websocket response stored" ); } Err(error) => { tracing::error!( listener_session_id = session.id, error = %error, - "failed to store incoming websocket message" - ); - } - } - let method_value_result = - serde_json::from_str::(&message_text); - match method_value_result { - Ok(json_value) => { - let method_option = json_value - .get("method") - .and_then(serde_json::Value::as_str); - match method_option { - Some("slotNotification") => { - let parse_result = - crate::solana_rpc_ws::parse_slot_notification(&message_text); - match parse_result { - Ok(notification) => { - tracing::trace!( - listener_session_id = session.id, - subscription_id = notification.params.subscription, - slot = notification.params.result.slot, - parent = notification.params.result.parent, - root = notification.params.result.root, - "parsed slot notification" - ); - } - Err(error) => { - tracing::error!( - listener_session_id = session.id, - error = %error, - "failed to parse slot notification" - ); - } - } - } - Some("logsNotification") => { - let parse_result = - crate::solana_rpc_ws::parse_logs_notification(&message_text); - match parse_result { - Ok(notification) => { - tracing::trace!( - listener_session_id = session.id, - subscription_id = notification.params.subscription, - signature = %notification.params.result.value.signature, - "parsed logs notification" - ); - } - Err(error) => { - tracing::error!( - listener_session_id = session.id, - error = %error, - "failed to parse logs notification" - ); - } - } - } - Some("programNotification") => { - let parse_result = - crate::solana_rpc_ws::parse_program_notification(&message_text); - match parse_result { - Ok(notification) => { - tracing::trace!( - listener_session_id = session.id, - subscription_id = notification.params.subscription, - program_pubkey = %notification.params.result.value.pubkey, - "parsed program notification" - ); - } - Err(error) => { - tracing::error!( - listener_session_id = session.id, - error = %error, - "failed to parse program notification" - ); - } - } - } - Some(other_method) => { - tracing::trace!( - listener_session_id = session.id, - method = %other_method, - "received unsupported websocket notification method" - ); - } - None => { - tracing::trace!( - listener_session_id = session.id, - "received websocket json message without notification method" - ); - } - } - } - Err(error) => { - tracing::error!( - listener_session_id = session.id, - error = %error, - "failed to decode websocket message as json value" + "failed to store incoming websocket response" ); } } } - Ok(None) => { + Ok(crate::KhbbWsIncomingMessage::Notification { raw, method, .. }) => { + let insert_ws_message_result = crate::storage::insert_raw_ws_message( + pool, + session.id, + "incoming", + &raw, + ) + .await; + match insert_ws_message_result { + Ok(()) => { + tracing::trace!( + listener_session_id = session.id, + method = %method, + "raw websocket notification stored" + ); + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to store incoming websocket notification" + ); + } + } + match method.as_str() { + "slotNotification" => { + let parse_result = + crate::solana_rpc_ws::parse_slot_notification(&raw); + match parse_result { + Ok(notification) => { + tracing::trace!( + listener_session_id = session.id, + subscription_id = notification.params.subscription, + slot = notification.params.result.slot, + parent = notification.params.result.parent, + root = notification.params.result.root, + "parsed slot notification" + ); + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to parse slot notification" + ); + } + } + } + "logsNotification" => { + let parse_result = + crate::solana_rpc_ws::parse_logs_notification(&raw); + match parse_result { + Ok(notification) => { + tracing::trace!( + listener_session_id = session.id, + subscription_id = notification.params.subscription, + signature = %notification.params.result.value.signature, + "parsed logs notification" + ); + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to parse logs notification" + ); + } + } + } + "programNotification" => { + let parse_result = + crate::solana_rpc_ws::parse_program_notification(&raw); + match parse_result { + Ok(notification) => { + tracing::trace!( + listener_session_id = session.id, + subscription_id = notification.params.subscription, + program_pubkey = %notification.params.result.value.pubkey, + "parsed program notification" + ); + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to parse program notification" + ); + } + } + } + _ => { + tracing::trace!( + listener_session_id = session.id, + method = %method, + "received unsupported websocket notification method" + ); + } + } + } + Ok(crate::KhbbWsIncomingMessage::Unknown { raw, .. }) => { + let insert_ws_message_result = crate::storage::insert_raw_ws_message( + pool, + session.id, + "incoming", + &raw, + ) + .await; + match insert_ws_message_result { + Ok(()) => { + tracing::trace!( + listener_session_id = session.id, + "raw unknown websocket message stored" + ); + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to store incoming unknown websocket message" + ); + } + } + } + Ok(crate::KhbbWsIncomingMessage::StreamEnded) => { tracing::info!( listener_session_id = session.id, "websocket stream ended" @@ -426,6 +454,7 @@ pub async fn run_listener_runtime( error = %error, "failed to read websocket message" ); + final_status = std::string::String::from("ws_read_error"); break; } @@ -462,6 +491,7 @@ pub async fn run_listener_runtime( ); } } + return Err(crate::KhbbError::Runtime { context: "wait for ctrl-c", message: error.to_string(), @@ -471,31 +501,50 @@ pub async fn run_listener_runtime( } } } + tracing::info!( + listener_session_id = session.id, + subscription_count = ws_subscription_handles.len(), + "starting websocket unsubscribe phase" + ); for subscription_handle in &ws_subscription_handles { - let unsubscribe_result = ws_client - .unsubscribe( + let unsubscribe_timeout_result = tokio::time::timeout( + std::time::Duration::from_millis(500), + ws_client.unsubscribe( subscription_handle.kind, subscription_handle.subscription_id, - tick_count.saturating_add(subscription_handle.request_id), - ) - .await; - match unsubscribe_result { - Ok(value) => { - tracing::info!( - listener_session_id = session.id, - unsubscribed = value, - subscription_id = subscription_handle.subscription_id, - kind = ?subscription_handle.kind, - "websocket subscription cancelled" - ); + 1000u64 + .saturating_add(tick_count) + .saturating_add(subscription_handle.request_id), + ), + ) + .await; + match unsubscribe_timeout_result { + Ok(unsubscribe_result) => match unsubscribe_result { + Ok(value) => { + tracing::info!( + listener_session_id = session.id, + unsubscribed = value, + subscription_id = subscription_handle.subscription_id, + kind = ?subscription_handle.kind, + "websocket subscription cancelled" + ); + }, + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + subscription_id = subscription_handle.subscription_id, + kind = ?subscription_handle.kind, + "failed to cancel websocket subscription" + ); + }, }, - Err(error) => { + Err(_) => { tracing::error!( listener_session_id = session.id, - error = %error, subscription_id = subscription_handle.subscription_id, kind = ?subscription_handle.kind, - "failed to cancel websocket subscription" + "websocket unsubscribe timed out" ); }, } @@ -517,7 +566,11 @@ pub async fn run_listener_runtime( crate::storage::update_listener_session_status(pool, session.id, &final_status).await; match status_update_result { Ok(()) => { - tracing::info!(listener_session_id = session.id, "listener runtime stopped"); + tracing::info!( + listener_session_id = session.id, + final_status = %final_status, + "listener runtime stopped" + ); }, Err(error) => { return Err(error); diff --git a/khbb_lib/src/solana_rpc_ws.rs b/khbb_lib/src/solana_rpc_ws.rs index a72278c..2535e4e 100644 --- a/khbb_lib/src/solana_rpc_ws.rs +++ b/khbb_lib/src/solana_rpc_ws.rs @@ -147,6 +147,50 @@ pub struct KhbbWsSubscribeCallOutput { pub response_body: std::string::String, } +/// Classified incoming WebSocket message. +#[derive(Debug, Clone)] +pub enum KhbbWsIncomingMessage { + /// A JSON-RPC response message identified by `id`. + Response { + /// Raw JSON message. + raw: std::string::String, + /// JSON-RPC response id. + id: u64, + /// Parsed JSON value. + json: serde_json::Value, + }, + /// A PubSub notification message identified by `method`. + Notification { + /// Raw JSON message. + raw: std::string::String, + /// Notification method. + method: std::string::String, + /// Parsed JSON value. + json: serde_json::Value, + }, + /// A JSON message that is neither a response nor a known notification. + Unknown { + /// Raw JSON message. + raw: std::string::String, + /// Parsed JSON value. + json: serde_json::Value, + }, + /// The WebSocket stream ended. + StreamEnded, +} + +impl KhbbWsIncomingMessage { + /// Returns the raw JSON message if this variant carries one. + pub fn raw_message_text(&self) -> std::option::Option<&str> { + match self { + Self::Response { raw, .. } => Some(raw.as_str()), + Self::Notification { raw, .. } => Some(raw.as_str()), + Self::Unknown { raw, .. } => Some(raw.as_str()), + Self::StreamEnded => None, + } + } +} + /// Minimal Solana WebSocket JSON-RPC client. #[derive(Debug)] pub struct KhbbSolanaWsRpcClient { @@ -158,6 +202,8 @@ pub struct KhbbSolanaWsRpcClient { tokio_tungstenite::MaybeTlsStream, >, >, + /// Pending incoming messages captured while waiting for specific responses. + pub(crate) pending_incoming_messages: std::collections::VecDeque, } impl KhbbSolanaWsRpcClient { @@ -172,7 +218,11 @@ impl KhbbSolanaWsRpcClient { ), }); } - Ok(Self { config, ws_stream: None }) + Ok(Self { + config, + ws_stream: None, + pending_incoming_messages: std::collections::VecDeque::new(), + }) } /// Connects the WebSocket client to the configured endpoint. @@ -270,11 +320,181 @@ impl KhbbSolanaWsRpcClient { } } - /// Reads the next text message from the socket. + /// Reads the next classified incoming message. /// - /// This method skips ping and pong frames, answers ping with pong, and - /// returns `Ok(None)` on close or end-of-stream. - pub async fn read_next_text_message( + /// This method first drains the internal pending queue, then reads from the + /// socket if the queue is empty. + pub async fn read_next_incoming_message( + &mut self, + ) -> core::result::Result { + let pending_message_option = self.pending_incoming_messages.pop_front(); + if let Some(message) = pending_message_option { + return Ok(message); + } + let raw_message_result = self.read_next_raw_text_message().await; + let raw_message_option = match raw_message_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let raw_message = match raw_message_option { + Some(value) => value, + None => { + return Ok(KhbbWsIncomingMessage::StreamEnded); + }, + }; + classify_incoming_message(&raw_message) + } + + /// Performs a `slotSubscribe` call. + pub async fn slot_subscribe( + &mut self, + id: u64, + ) -> core::result::Result { + self.subscribe_with_raw_response( + KhbbWsSubscriptionKind::Slot, + std::vec::Vec::::new(), + id, + ) + .await + } + + /// Performs a `logsSubscribe` call. + pub async fn logs_subscribe( + &mut self, + filter: solana_rpc_client_api::config::RpcTransactionLogsFilter, + config: std::option::Option, + id: u64, + ) -> core::result::Result { + let params_result = build_logs_subscribe_params(filter, config); + let params = match params_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + self.subscribe_with_raw_response(KhbbWsSubscriptionKind::Logs, params, id).await + } + + /// Performs a `programSubscribe` call. + pub async fn program_subscribe( + &mut self, + program_id: &str, + config: std::option::Option, + id: u64, + ) -> core::result::Result { + let params_result = build_program_subscribe_params(program_id, config); + let params = match params_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + self.subscribe_with_raw_response(KhbbWsSubscriptionKind::Program, params, id) + .await + } + + /// Performs a generic unsubscribe call. + pub async fn unsubscribe( + &mut self, + kind: KhbbWsSubscriptionKind, + subscription_id: u64, + id: u64, + ) -> core::result::Result { + let request_body_result = self + .send_json_rpc_request( + kind.unsubscribe_method_name(), + vec![serde_json::Value::Number(subscription_id.into())], + id, + ) + .await; + match request_body_result { + Ok(_) => {} + Err(error) => { + return Err(error); + } + } + let response_body_result = self.wait_for_response_raw_by_id(id).await; + let response_body = match response_body_result { + Ok(value) => value, + Err(error) => { + return Err(error); + } + }; + let parsed_response_result = parse_json_rpc_response::(&response_body); + let parsed_response = match parsed_response_result { + Ok(value) => value, + Err(error) => { + return Err(error); + } + }; + if let Some(error_value) = parsed_response.error { + let error_text_result = serde_json::to_string(&error_value); + let error_text = match error_text_result { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Json { + context: "serialize websocket unsubscribe rpc error", + message: error.to_string(), + }); + } + }; + return Err(crate::KhbbError::Runtime { + context: "websocket unsubscribe returned rpc error", + message: error_text, + }); + } + match parsed_response.result { + Some(value) => Ok(value), + None => Err(crate::KhbbError::Runtime { + context: "websocket unsubscribe returned empty result", + message: response_body, + }), + } + } + + async fn subscribe_with_raw_response( + &mut self, + kind: KhbbWsSubscriptionKind, + params: TParams, + id: u64, + ) -> core::result::Result + where + TParams: serde::Serialize + serde::de::DeserializeOwned, + { + let request_body_result = + self.send_json_rpc_request(kind.subscribe_method_name(), params, id).await; + let request_body = match request_body_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let response_body_result = self.wait_for_response_raw_by_id(id).await; + let response_body = match response_body_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let subscription_id_result = parse_subscription_id_response(&response_body); + let subscription_id = match subscription_id_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + Ok(KhbbWsSubscribeCallOutput { + request_id: id, + method: std::string::String::from(kind.subscribe_method_name()), + subscription_id, + request_body, + response_body, + }) + } + + async fn read_next_raw_text_message( &mut self, ) -> core::result::Result, crate::KhbbError> { loop { @@ -345,208 +565,196 @@ impl KhbbSolanaWsRpcClient { } } - /// Performs a `slotSubscribe` call. - pub async fn slot_subscribe( + async fn wait_for_response_raw_by_id( &mut self, - id: u64, - ) -> core::result::Result { - self.subscribe_with_raw_response( - KhbbWsSubscriptionKind::Slot, - std::vec::Vec::::new(), - id, - ) - .await - } - - /// Performs a `logsSubscribe` call. - pub async fn logs_subscribe( - &mut self, - filter: solana_rpc_client_api::config::RpcTransactionLogsFilter, - config: std::option::Option, - id: u64, - ) -> core::result::Result { - let params_result = build_logs_subscribe_params(filter, config); - let params = match params_result { - Ok(value) => value, - Err(error) => { - return Err(error); - }, - }; - self.subscribe_with_raw_response(KhbbWsSubscriptionKind::Logs, params, id).await - } - - /// Performs a `programSubscribe` call. - pub async fn program_subscribe( - &mut self, - program_id: &str, - config: std::option::Option, - id: u64, - ) -> core::result::Result { - let params_result = build_program_subscribe_params(program_id, config); - let params = match params_result { - Ok(value) => value, - Err(error) => { - return Err(error); - }, - }; - self.subscribe_with_raw_response(KhbbWsSubscriptionKind::Program, params, id) - .await - } - - /// Performs a generic unsubscribe call. - pub async fn unsubscribe( - &mut self, - kind: KhbbWsSubscriptionKind, - subscription_id: u64, - id: u64, - ) -> core::result::Result { - let request_body_result = self - .send_json_rpc_request( - kind.unsubscribe_method_name(), - vec![serde_json::Value::Number(subscription_id.into())], - id, - ) - .await; - match request_body_result { - Ok(_) => {}, - Err(error) => { - return Err(error); - }, - } + expected_id: u64, + ) -> core::result::Result { + let mut deferred_messages = std::vec::Vec::::new(); loop { - let response_body_result = self.read_next_text_message().await; - let response_body_option = match response_body_result { - Ok(value) => value, - Err(error) => { - return Err(error); - }, - }; - let response_body = match response_body_option { - Some(value) => value, - None => { - return Err(crate::KhbbError::Runtime { - context: "read websocket unsubscribe response", - message: std::string::String::from( - "websocket stream ended before unsubscribe response", - ), - }); - }, - }; - let json_value_result = serde_json::from_str::(&response_body); - let json_value = match json_value_result { - Ok(value) => value, - Err(error) => { - return Err(crate::KhbbError::Json { - context: "decode websocket unsubscribe response as json value", - message: error.to_string(), - }); - }, - }; - let id_value_option = json_value.get("id"); - if id_value_option.is_none() { - continue; - } - let parsed_response_result = parse_json_rpc_response::(&response_body); - let parsed_response = match parsed_response_result { - Ok(value) => value, - Err(error) => { - return Err(error); - }, - }; - if let Some(error_value) = parsed_response.error { - let error_text_result = serde_json::to_string(&error_value); - let error_text = match error_text_result { - Ok(value) => value, - Err(error) => { - return Err(crate::KhbbError::Json { - context: "serialize websocket unsubscribe rpc error", - message: error.to_string(), + let pending_message_option = self.pending_incoming_messages.pop_front(); + if let Some(message) = pending_message_option { + match message { + KhbbWsIncomingMessage::Response { raw, id, .. } => { + if id == expected_id { + while let Some(deferred_message) = deferred_messages.pop() { + self.pending_incoming_messages.push_front(deferred_message); + } + return Ok(raw); + } + let parse_json_result = serde_json::from_str::(&raw); + let json_value = match parse_json_result { + Ok(value) => value, + Err(error) => { + while let Some(deferred_message) = deferred_messages.pop() { + self.pending_incoming_messages.push_front(deferred_message); + } + return Err(crate::KhbbError::Json { + context: "reparse deferred websocket response", + message: error.to_string(), + }); + }, + }; + deferred_messages.push(KhbbWsIncomingMessage::Response { + raw, + id, + json: json_value, }); }, - }; - return Err(crate::KhbbError::Runtime { - context: "websocket unsubscribe returned rpc error", - message: error_text, - }); + KhbbWsIncomingMessage::Notification { raw, method, json } => { + deferred_messages.push(KhbbWsIncomingMessage::Notification { + raw, + method, + json, + }); + }, + KhbbWsIncomingMessage::Unknown { raw, json } => { + deferred_messages.push(KhbbWsIncomingMessage::Unknown { raw, json }); + }, + KhbbWsIncomingMessage::StreamEnded => { + while let Some(deferred_message) = deferred_messages.pop() { + self.pending_incoming_messages.push_front(deferred_message); + } + return Err(crate::KhbbError::Runtime { + context: "wait for websocket response by id", + message: std::string::String::from( + "websocket stream ended while waiting for response", + ), + }); + }, + } + continue; } - match parsed_response.result { - Some(value) => return Ok(value), + let raw_message_result = self.read_next_raw_text_message().await; + let raw_message_option = match raw_message_result { + Ok(value) => value, + Err(error) => { + while let Some(deferred_message) = deferred_messages.pop() { + self.pending_incoming_messages.push_front(deferred_message); + } + return Err(error); + }, + }; + let raw_message = match raw_message_option { + Some(value) => value, None => { + while let Some(deferred_message) = deferred_messages.pop() { + self.pending_incoming_messages.push_front(deferred_message); + } return Err(crate::KhbbError::Runtime { - context: "websocket unsubscribe returned empty result", - message: response_body, + context: "wait for websocket response by id", + message: std::string::String::from( + "websocket stream ended while waiting for response", + ), + }); + }, + }; + let classified_result = classify_incoming_message(&raw_message); + let classified_message = match classified_result { + Ok(value) => value, + Err(error) => { + while let Some(deferred_message) = deferred_messages.pop() { + self.pending_incoming_messages.push_front(deferred_message); + } + return Err(error); + }, + }; + match classified_message { + KhbbWsIncomingMessage::Response { raw, id, .. } => { + if id == expected_id { + while let Some(deferred_message) = deferred_messages.pop() { + self.pending_incoming_messages.push_front(deferred_message); + } + return Ok(raw); + } + let parse_json_result = serde_json::from_str::(&raw); + let json_value = match parse_json_result { + Ok(value) => value, + Err(error) => { + while let Some(deferred_message) = deferred_messages.pop() { + self.pending_incoming_messages.push_front(deferred_message); + } + return Err(crate::KhbbError::Json { + context: "reparse deferred websocket response", + message: error.to_string(), + }); + }, + }; + deferred_messages.push(KhbbWsIncomingMessage::Response { + raw, + id, + json: json_value, + }); + }, + KhbbWsIncomingMessage::Notification { raw, method, json } => { + deferred_messages.push(KhbbWsIncomingMessage::Notification { + raw, + method, + json, + }); + }, + KhbbWsIncomingMessage::Unknown { raw, json } => { + deferred_messages.push(KhbbWsIncomingMessage::Unknown { raw, json }); + }, + KhbbWsIncomingMessage::StreamEnded => { + while let Some(deferred_message) = deferred_messages.pop() { + self.pending_incoming_messages.push_front(deferred_message); + } + return Err(crate::KhbbError::Runtime { + context: "wait for websocket response by id", + message: std::string::String::from( + "websocket stream ended while waiting for response", + ), }); }, } } } +} - async fn subscribe_with_raw_response( - &mut self, - kind: KhbbWsSubscriptionKind, - params: TParams, - id: u64, - ) -> core::result::Result - where - TParams: serde::Serialize + serde::de::DeserializeOwned, - { - let request_body_result = - self.send_json_rpc_request(kind.subscribe_method_name(), params, id).await; - let request_body = match request_body_result { - Ok(value) => value, - Err(error) => { - return Err(error); - }, - }; - loop { - let response_body_result = self.read_next_text_message().await; - let response_body_option = match response_body_result { - Ok(value) => value, - Err(error) => { - return Err(error); - }, - }; - let response_body = match response_body_option { - Some(value) => value, - None => { - return Err(crate::KhbbError::Runtime { - context: "read websocket subscribe response", - message: std::string::String::from( - "websocket stream ended before subscribe response", - ), - }); - }, - }; - let json_value_result = serde_json::from_str::(&response_body); - let json_value = match json_value_result { - Ok(value) => value, - Err(error) => { - return Err(crate::KhbbError::Json { - context: "decode websocket subscribe response as json value", - message: error.to_string(), - }); - }, - }; - let id_value_option = json_value.get("id"); - if id_value_option.is_none() { - continue; - } - let subscription_id_result = parse_subscription_id_response(&response_body); - let subscription_id = match subscription_id_result { - Ok(value) => value, - Err(error) => { - return Err(error); - }, - }; - return Ok(KhbbWsSubscribeCallOutput { - request_id: id, - method: std::string::String::from(kind.subscribe_method_name()), - subscription_id, - request_body, - response_body, +/// Classifies an incoming raw WebSocket JSON message. +pub(crate) fn classify_incoming_message( + raw_message: &str, +) -> core::result::Result { + let parse_result = serde_json::from_str::(raw_message); + let json_value = match parse_result { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Json { + context: "classify websocket incoming message", + message: error.to_string(), + }); + }, + }; + classify_incoming_message_from_json_value(raw_message, json_value) +} + +pub(crate) fn classify_incoming_message_from_json_value( + raw_message: &str, + json_value: serde_json::Value, +) -> core::result::Result { + let id_option = json_value.get("id"); + if let Some(id_value) = id_option { + let id_as_u64_option = id_value.as_u64(); + if let Some(id) = id_as_u64_option { + return Ok(KhbbWsIncomingMessage::Response { + raw: std::string::String::from(raw_message), + id, + json: json_value, }); } } + let method_option = json_value.get("method").and_then(serde_json::Value::as_str); + if let Some(method) = method_option { + return Ok(KhbbWsIncomingMessage::Notification { + raw: std::string::String::from(raw_message), + method: std::string::String::from(method), + json: json_value, + }); + } + Ok(KhbbWsIncomingMessage::Unknown { + raw: std::string::String::from(raw_message), + json: json_value, + }) } /// Parses a standard JSON-RPC response envelope. @@ -577,6 +785,7 @@ pub(crate) fn parse_slot_notification( let parse_result = serde_json::from_str::< KhbbWsNotificationEnvelope, >(response_body); + match parse_result { Ok(value) => Ok(value), Err(error) => Err(crate::KhbbError::Json { @@ -761,6 +970,38 @@ mod tests { ); } + #[test] + fn classify_incoming_message_accepts_response() { + let body = r#"{"jsonrpc":"2.0","id":7,"result":42}"#; + let result = super::classify_incoming_message(body); + assert!(result.is_ok()); + let message = result.expect("classify response"); + match message { + super::KhbbWsIncomingMessage::Response { id, .. } => { + assert_eq!(id, 7); + }, + _ => { + panic!("expected response"); + }, + } + } + + #[test] + fn classify_incoming_message_accepts_notification() { + let body = r#"{"jsonrpc":"2.0","method":"slotNotification","params":{"subscription":1,"result":{"slot":2,"parent":1,"root":1}}}"#; + let result = super::classify_incoming_message(body); + assert!(result.is_ok()); + let message = result.expect("classify notification"); + match message { + super::KhbbWsIncomingMessage::Notification { method, .. } => { + assert_eq!(method, "slotNotification"); + }, + _ => { + panic!("expected notification"); + }, + } + } + #[test] fn parse_subscription_id_response_accepts_success_payload() { let body = r#"{