diff --git a/Cargo.toml b/Cargo.toml index 50b0a7d..9824a41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.3.2" +version = "0.4.1" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot" diff --git a/khbb_lib/src/lib.rs b/khbb_lib/src/lib.rs index f83270c..2f2d771 100644 --- a/khbb_lib/src/lib.rs +++ b/khbb_lib/src/lib.rs @@ -17,6 +17,7 @@ mod listener; mod solana_rpc_http; mod storage; mod tracing_setup; +mod solana_rpc_ws; /// Runs the listener application bootstrap workflow. pub use crate::app::run_listener_app; @@ -44,3 +45,21 @@ pub use crate::storage::create_sqlite_pool; pub use crate::storage::ensure_sqlite_schema; /// Initializes the tracing subscriber for the process. pub use crate::tracing_setup::init_tracing; +/// Minimal Solana WebSocket JSON-RPC client. +pub use crate::solana_rpc_ws::KhbbSolanaWsRpcClient; +/// Configuration of the minimal Solana WebSocket RPC client. +pub use crate::solana_rpc_ws::KhbbSolanaWsRpcClientConfig; +/// Supported initial WebSocket subscription kinds. +pub use crate::solana_rpc_ws::KhbbWsSubscriptionKind; +/// Handle returned after a successful WebSocket subscription. +pub use crate::solana_rpc_ws::KhbbWsSubscriptionHandle; +/// Output of a successful WebSocket subscribe call. +pub use crate::solana_rpc_ws::KhbbWsSubscribeCallOutput; +/// Minimal JSON-RPC request envelope for WebSocket RPC calls. +pub use crate::solana_rpc_ws::KhbbWsJsonRpcRequestEnvelope; +/// Minimal JSON-RPC response envelope for WebSocket RPC calls. +pub use crate::solana_rpc_ws::KhbbWsJsonRpcResponseEnvelope; +/// Notification envelope used by Solana PubSub notifications. +pub use crate::solana_rpc_ws::KhbbWsNotificationEnvelope; +/// Notification params envelope used by Solana PubSub notifications. +pub use crate::solana_rpc_ws::KhbbWsNotificationParams; diff --git a/khbb_lib/src/listener.rs b/khbb_lib/src/listener.rs index abc4ffa..f6e7644 100644 --- a/khbb_lib/src/listener.rs +++ b/khbb_lib/src/listener.rs @@ -26,21 +26,100 @@ pub async fn run_listener_runtime( let tick_duration = std::time::Duration::from_millis(config.listener_poll_interval_ms); let mut interval = tokio::time::interval(tick_duration); let mut tick_count: u64 = 0; - let http_client_config = crate::KhbbSolanaHttpRpcClientConfig { - url: config.solana_http_rpc_url.clone(), - }; - let http_client_result = - crate::KhbbSolanaHttpRpcClient::new(http_client_config); + let http_client_config = + crate::KhbbSolanaHttpRpcClientConfig { url: config.solana_http_rpc_url.clone() }; + let http_client_result = crate::KhbbSolanaHttpRpcClient::new(http_client_config); let http_client = match http_client_result { Ok(value) => value, Err(error) => { return Err(error); - } + }, }; + let ws_client_config = + crate::KhbbSolanaWsRpcClientConfig { url: config.solana_ws_rpc_url.clone() }; + + let ws_client_result = crate::KhbbSolanaWsRpcClient::new(ws_client_config); + + let mut ws_client = match ws_client_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let ws_connect_result = ws_client.connect().await; + match ws_connect_result { + Ok(()) => { + tracing::info!( + listener_session_id = session.id, + solana_ws_rpc_url = %config.solana_ws_rpc_url, + "websocket rpc client connected" + ); + }, + Err(error) => { + return Err(error); + }, + } + let slot_subscribe_result = ws_client.slot_subscribe(1).await; + let slot_subscribe_output = match slot_subscribe_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let insert_ws_outgoing_result = crate::storage::insert_raw_ws_message( + pool, + session.id, + "outgoing", + &slot_subscribe_output.request_body, + ) + .await; + match insert_ws_outgoing_result { + Ok(()) => {}, + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to store outgoing websocket subscribe request" + ); + }, + } + let insert_ws_incoming_result = crate::storage::insert_raw_ws_message( + pool, + session.id, + "incoming", + &slot_subscribe_output.response_body, + ) + .await; + match insert_ws_incoming_result { + Ok(()) => {}, + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to store incoming websocket subscribe response" + ); + }, + } + let slot_subscription_handle = crate::KhbbWsSubscriptionHandle { + request_id: slot_subscribe_output.request_id, + subscription_id: slot_subscribe_output.subscription_id, + kind: crate::KhbbWsSubscriptionKind::Slot, + }; + tracing::info!( + listener_session_id = session.id, + request_id = slot_subscription_handle.request_id, + subscription_id = slot_subscription_handle.subscription_id, + "slot websocket subscription established" + ); loop { tokio::select! { _ = interval.tick() => { tick_count = tick_count.saturating_add(1); + tracing::trace!( + listener_session_id = session.id, + tick_count = tick_count, + "listener runtime heartbeat" + ); let slot_result = http_client.get_slot(tick_count as u64).await; match slot_result { Ok(call_output) => { @@ -64,6 +143,14 @@ pub async fn run_listener_runtime( error = %error, "failed to insert raw http rpc message" ); + } else { + tracing::trace!( + listener_session_id = session.id, + request_id = call_output.request_id, + method = %call_output.method, + status = status, + "raw http rpc message stored" + ); } } Err(error) => { @@ -73,11 +160,55 @@ pub async fn run_listener_runtime( ); } } - tracing::trace!( - listener_session_id = session.id, - tick_count = tick_count, - "listener runtime heartbeat" - ); + let ws_read_timeout_result = tokio::time::timeout( + std::time::Duration::from_millis(50), + ws_client.read_next_text_message(), + ) + .await; + match ws_read_timeout_result { + Ok(read_result) => { + match read_result { + Ok(Some(message_text)) => { + let insert_ws_message_result = crate::storage::insert_raw_ws_message( + pool, + session.id, + "incoming", + &message_text, + ) + .await; + match insert_ws_message_result { + Ok(()) => { + tracing::trace!( + listener_session_id = session.id, + "raw websocket message stored" + ); + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to store incoming websocket message" + ); + } + } + } + Ok(None) => { + tracing::info!( + listener_session_id = session.id, + "websocket stream ended" + ); + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to read websocket message" + ); + } + } + } + Err(_) => {} + } if tick_count >= 3 { break; } @@ -116,6 +247,44 @@ pub async fn run_listener_runtime( } } } + let unsubscribe_result = ws_client + .unsubscribe( + slot_subscription_handle.kind, + slot_subscription_handle.subscription_id, + tick_count.saturating_add(10), + ) + .await; + match unsubscribe_result { + Ok(value) => { + tracing::info!( + listener_session_id = session.id, + unsubscribed = value, + subscription_id = slot_subscription_handle.subscription_id, + "slot websocket subscription cancelled" + ); + }, + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + subscription_id = slot_subscription_handle.subscription_id, + "failed to cancel slot websocket subscription" + ); + }, + } + let ws_close_result = ws_client.close().await; + match ws_close_result { + Ok(()) => { + tracing::info!(listener_session_id = session.id, "websocket rpc client closed"); + }, + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to close websocket rpc client" + ); + }, + } let status_update_result = crate::storage::update_listener_session_status(pool, session.id, "stopped").await; match status_update_result { diff --git a/khbb_lib/src/solana_rpc_ws.rs b/khbb_lib/src/solana_rpc_ws.rs new file mode 100644 index 0000000..c0559c6 --- /dev/null +++ b/khbb_lib/src/solana_rpc_ws.rs @@ -0,0 +1,796 @@ +// file: khbb_lib/src/solana_rpc_ws.rs +//! Minimal Solana WebSocket JSON-RPC client. +//! +//! This module keeps full control over the WebSocket transport and JSON-RPC +//! envelopes while reusing official Solana RPC config and response types where +//! they exist. + +use futures_util::SinkExt; +use futures_util::StreamExt; + +/// Configuration of the minimal Solana WebSocket RPC client. +#[derive(Debug, Clone)] +pub struct KhbbSolanaWsRpcClientConfig { + /// Base WebSocket RPC endpoint. + pub url: std::string::String, +} + +/// Supported WebSocket subscription kinds for the initial implementation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum KhbbWsSubscriptionKind { + /// `slotSubscribe` + Slot, + /// `logsSubscribe` + Logs, + /// `programSubscribe` + Program, +} + +impl KhbbWsSubscriptionKind { + /// Returns the JSON-RPC subscribe method name. + pub(crate) fn subscribe_method_name(&self) -> &'static str { + match self { + Self::Slot => "slotSubscribe", + Self::Logs => "logsSubscribe", + Self::Program => "programSubscribe", + } + } + + /// Returns the JSON-RPC unsubscribe method name. + pub(crate) fn unsubscribe_method_name(&self) -> &'static str { + match self { + Self::Slot => "slotUnsubscribe", + Self::Logs => "logsUnsubscribe", + Self::Program => "programUnsubscribe", + } + } +} + +/// Minimal JSON-RPC request envelope for WebSocket RPC calls. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(bound( + serialize = "TParams: serde::Serialize", + deserialize = "TParams: serde::de::DeserializeOwned" +))] +pub struct KhbbWsJsonRpcRequestEnvelope +where + TParams: serde::Serialize + serde::de::DeserializeOwned, +{ + /// JSON-RPC protocol version. + pub jsonrpc: std::string::String, + /// RPC method name. + pub method: std::string::String, + /// RPC parameters. + pub params: TParams, + /// Request identifier. + pub id: u64, +} + +/// Minimal JSON-RPC response envelope for WebSocket RPC calls. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(bound( + serialize = "TResult: serde::Serialize", + deserialize = "TResult: serde::de::DeserializeOwned" +))] +pub struct KhbbWsJsonRpcResponseEnvelope +where + TResult: serde::Serialize + serde::de::DeserializeOwned, +{ + /// JSON-RPC protocol version. + pub jsonrpc: std::string::String, + /// Successful result if present. + pub result: std::option::Option, + /// Raw RPC error object if present. + pub error: std::option::Option, + /// Response identifier. + pub id: serde_json::Value, +} + +/// Notification params envelope used by Solana PubSub notifications. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(bound( + serialize = "TResult: serde::Serialize", + deserialize = "TResult: serde::de::DeserializeOwned" +))] +pub struct KhbbWsNotificationParams +where + TResult: serde::Serialize + serde::de::DeserializeOwned, +{ + /// Notification result payload. + pub result: TResult, + /// Subscription identifier. + pub subscription: u64, +} + +/// Notification envelope used by Solana PubSub notifications. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(bound( + serialize = "TResult: serde::Serialize", + deserialize = "TResult: serde::de::DeserializeOwned" +))] +pub struct KhbbWsNotificationEnvelope +where + TResult: serde::Serialize + serde::de::DeserializeOwned, +{ + /// JSON-RPC protocol version. + pub jsonrpc: std::string::String, + /// Notification method name. + pub method: std::string::String, + /// Notification parameters. + pub params: KhbbWsNotificationParams, +} + +/// Handle returned after a successful subscription. +#[derive(Debug, Clone)] +pub struct KhbbWsSubscriptionHandle { + /// Client-side request identifier used for the subscribe call. + pub request_id: u64, + /// Server-side subscription identifier. + pub subscription_id: u64, + /// Subscription kind. + pub kind: KhbbWsSubscriptionKind, +} + +/// Output of a successful WebSocket subscribe call. +#[derive(Debug, Clone)] +pub struct KhbbWsSubscribeCallOutput { + /// Client-side request identifier used for the subscribe call. + pub request_id: u64, + /// Subscribe method name. + pub method: std::string::String, + /// Server-side subscription identifier. + pub subscription_id: u64, + /// Raw request JSON sent over the socket. + pub request_body: std::string::String, + /// Raw response JSON received from the socket. + pub response_body: std::string::String, +} + +/// Minimal Solana WebSocket JSON-RPC client. +#[derive(Debug)] +pub struct KhbbSolanaWsRpcClient { + /// Client configuration. + pub(crate) config: KhbbSolanaWsRpcClientConfig, + /// Connected WebSocket stream if connected. + pub(crate) ws_stream: std::option::Option< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + >, +} + +impl KhbbSolanaWsRpcClient { + /// Creates a new minimal Solana WebSocket JSON-RPC client. + pub fn new( + config: KhbbSolanaWsRpcClientConfig, + ) -> core::result::Result { + if config.url.trim().is_empty() { + return Err(crate::KhbbError::Config { + message: std::string::String::from( + "solana websocket rpc client url must not be empty", + ), + }); + } + Ok(Self { config, ws_stream: None }) + } + + /// Connects the WebSocket client to the configured endpoint. + pub async fn connect(&mut self) -> core::result::Result<(), crate::KhbbError> { + if self.ws_stream.is_some() { + return Err(crate::KhbbError::Runtime { + context: "connect websocket client", + message: std::string::String::from("websocket client is already connected"), + }); + } + let connect_result = tokio_tungstenite::connect_async(&self.config.url).await; + let (ws_stream, _) = match connect_result { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Runtime { + context: "connect websocket client", + message: error.to_string(), + }); + }, + }; + self.ws_stream = Some(ws_stream); + Ok(()) + } + + /// Closes the current WebSocket connection if connected. + pub async fn close(&mut self) -> core::result::Result<(), crate::KhbbError> { + if self.ws_stream.is_none() { + return Ok(()); + } + let ws_stream_option = self.ws_stream.as_mut(); + let ws_stream = match ws_stream_option { + Some(value) => value, + None => { + return Ok(()); + }, + }; + let close_result = ws_stream.close(None).await; + match close_result { + Ok(()) => { + self.ws_stream = None; + Ok(()) + }, + Err(error) => Err(crate::KhbbError::Runtime { + context: "close websocket client", + message: error.to_string(), + }), + } + } + + /// Sends a generic JSON-RPC request over the current WebSocket connection. + pub async fn send_json_rpc_request( + &mut self, + method: &str, + params: TParams, + id: u64, + ) -> core::result::Result + where + TParams: serde::Serialize + serde::de::DeserializeOwned, + { + let request = KhbbWsJsonRpcRequestEnvelope { + jsonrpc: std::string::String::from("2.0"), + method: std::string::String::from(method), + params, + id, + }; + let request_body_result = serde_json::to_string(&request); + let request_body = match request_body_result { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Json { + context: "serialize websocket json rpc request", + message: error.to_string(), + }); + }, + }; + let ws_stream_option = self.ws_stream.as_mut(); + let ws_stream = match ws_stream_option { + Some(value) => value, + None => { + return Err(crate::KhbbError::Runtime { + context: "send websocket json rpc request", + message: std::string::String::from("websocket client is not connected"), + }); + }, + }; + let send_result = ws_stream + .send(tokio_tungstenite::tungstenite::Message::Text(request_body.clone().into())) + .await; + match send_result { + Ok(()) => Ok(request_body), + Err(error) => Err(crate::KhbbError::Runtime { + context: "send websocket json rpc request", + message: error.to_string(), + }), + } + } + + /// Reads the next text message from the socket. + /// + /// This method skips ping and pong frames, answers ping with pong, and + /// returns `Ok(None)` on close or end-of-stream. + pub async fn read_next_text_message( + &mut self, + ) -> core::result::Result, crate::KhbbError> { + loop { + let ws_stream_option = self.ws_stream.as_mut(); + let ws_stream = match ws_stream_option { + Some(value) => value, + None => { + return Err(crate::KhbbError::Runtime { + context: "read websocket message", + message: std::string::String::from("websocket client is not connected"), + }); + }, + }; + let next_result = ws_stream.next().await; + let message_result = match next_result { + Some(value) => value, + None => { + return Ok(None); + }, + }; + let message = match message_result { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Runtime { + context: "read websocket message", + message: error.to_string(), + }); + }, + }; + match message { + tokio_tungstenite::tungstenite::Message::Text(text) => { + return Ok(Some(text.to_string())); + }, + tokio_tungstenite::tungstenite::Message::Binary(binary) => { + let utf8_result = std::string::String::from_utf8(binary.to_vec()); + match utf8_result { + Ok(value) => { + return Ok(Some(value)); + }, + Err(error) => { + return Err(crate::KhbbError::Runtime { + context: "decode websocket binary message as utf-8", + message: error.to_string(), + }); + }, + } + }, + tokio_tungstenite::tungstenite::Message::Ping(payload) => { + let pong_result = ws_stream + .send(tokio_tungstenite::tungstenite::Message::Pong(payload)) + .await; + match pong_result { + Ok(()) => {}, + Err(error) => { + return Err(crate::KhbbError::Runtime { + context: "send websocket pong frame", + message: error.to_string(), + }); + }, + } + }, + tokio_tungstenite::tungstenite::Message::Pong(_) => {}, + tokio_tungstenite::tungstenite::Message::Close(_) => { + return Ok(None); + }, + tokio_tungstenite::tungstenite::Message::Frame(_) => {}, + } + } + } + + /// Performs a `slotSubscribe` call. + pub async fn slot_subscribe( + &mut self, + id: u64, + ) -> core::result::Result { + self.subscribe_with_raw_response( + KhbbWsSubscriptionKind::Slot, + std::vec::Vec::::new(), + id, + ) + .await + } + + /// Performs a `logsSubscribe` call. + pub async fn logs_subscribe( + &mut self, + filter: solana_rpc_client_api::config::RpcTransactionLogsFilter, + config: std::option::Option, + id: u64, + ) -> core::result::Result { + let params_result = build_logs_subscribe_params(filter, config); + let params = match params_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + self.subscribe_with_raw_response(KhbbWsSubscriptionKind::Logs, params, id).await + } + + /// Performs a `programSubscribe` call. + pub async fn program_subscribe( + &mut self, + program_id: &str, + config: std::option::Option, + id: u64, + ) -> core::result::Result { + let params_result = build_program_subscribe_params(program_id, config); + let params = match params_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + self.subscribe_with_raw_response(KhbbWsSubscriptionKind::Program, params, id) + .await + } + + /// Performs a generic unsubscribe call. + pub async fn unsubscribe( + &mut self, + kind: KhbbWsSubscriptionKind, + subscription_id: u64, + id: u64, + ) -> core::result::Result { + let request_body_result = self + .send_json_rpc_request( + kind.unsubscribe_method_name(), + vec![serde_json::Value::Number(subscription_id.into())], + id, + ) + .await; + match request_body_result { + Ok(_) => {}, + Err(error) => { + return Err(error); + }, + } + let response_body_result = self.read_next_text_message().await; + let response_body_option = match response_body_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let response_body = match response_body_option { + Some(value) => value, + None => { + return Err(crate::KhbbError::Runtime { + context: "read websocket unsubscribe response", + message: std::string::String::from("websocket stream ended before response"), + }); + }, + }; + let parse_result = parse_json_rpc_response::(&response_body); + let parsed_response = match parse_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + if let Some(error_value) = parsed_response.error { + let error_text_result = serde_json::to_string(&error_value); + let error_text = match error_text_result { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Json { + context: "serialize websocket unsubscribe rpc error", + message: error.to_string(), + }); + } + }; + return Err(crate::KhbbError::Runtime { + context: "websocket unsubscribe returned rpc error", + message: error_text, + }); + } + match parsed_response.result { + Some(value) => Ok(value), + None => Err(crate::KhbbError::Runtime { + context: "websocket unsubscribe returned empty result", + message: response_body, + }), + } + } + + async fn subscribe_with_raw_response( + &mut self, + kind: KhbbWsSubscriptionKind, + params: TParams, + id: u64, + ) -> core::result::Result + where + TParams: serde::Serialize + serde::de::DeserializeOwned, + { + let request_body_result = + self.send_json_rpc_request(kind.subscribe_method_name(), params, id).await; + let request_body = match request_body_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let response_body_result = self.read_next_text_message().await; + let response_body_option = match response_body_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let response_body = match response_body_option { + Some(value) => value, + None => { + return Err(crate::KhbbError::Runtime { + context: "read websocket subscribe response", + message: std::string::String::from("websocket stream ended before response"), + }); + }, + }; + let subscription_id_result = parse_subscription_id_response(&response_body); + let subscription_id = match subscription_id_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + Ok(KhbbWsSubscribeCallOutput { + request_id: id, + method: std::string::String::from(kind.subscribe_method_name()), + subscription_id, + request_body, + response_body, + }) + } +} + +/// Parses a standard JSON-RPC response envelope. +pub(crate) fn parse_json_rpc_response( + response_body: &str, +) -> core::result::Result, crate::KhbbError> +where + TResult: serde::Serialize + serde::de::DeserializeOwned, +{ + let parse_result = + serde_json::from_str::>(response_body); + + match parse_result { + Ok(value) => Ok(value), + Err(error) => Err(crate::KhbbError::Json { + context: "parse websocket json rpc response", + message: error.to_string(), + }), + } +} + +/// Parses a `slotNotification` payload. +pub(crate) fn parse_slot_notification( + response_body: &str, +) -> core::result::Result< + KhbbWsNotificationEnvelope, + crate::KhbbError, +> { + let parse_result = serde_json::from_str::< + KhbbWsNotificationEnvelope, + >(response_body); + match parse_result { + Ok(value) => Ok(value), + Err(error) => Err(crate::KhbbError::Json { + context: "parse websocket slot notification", + message: error.to_string(), + }), + } +} + +/// Parses a `logsNotification` payload. +pub(crate) fn parse_logs_notification( + response_body: &str, +) -> core::result::Result< + KhbbWsNotificationEnvelope< + solana_rpc_client_api::response::Response, + >, + crate::KhbbError, +> { + let parse_result = serde_json::from_str::< + KhbbWsNotificationEnvelope< + solana_rpc_client_api::response::Response< + solana_rpc_client_api::response::RpcLogsResponse, + >, + >, + >(response_body); + match parse_result { + Ok(value) => Ok(value), + Err(error) => Err(crate::KhbbError::Json { + context: "parse websocket logs notification", + message: error.to_string(), + }), + } +} + +/// Parses a `programNotification` payload. +pub(crate) fn parse_program_notification( + response_body: &str, +) -> core::result::Result< + KhbbWsNotificationEnvelope< + solana_rpc_client_api::response::Response, + >, + crate::KhbbError, +> { + let parse_result = serde_json::from_str::< + KhbbWsNotificationEnvelope< + solana_rpc_client_api::response::Response< + solana_rpc_client_api::response::RpcKeyedAccount, + >, + >, + >(response_body); + match parse_result { + Ok(value) => Ok(value), + Err(error) => Err(crate::KhbbError::Json { + context: "parse websocket program notification", + message: error.to_string(), + }), + } +} + +/// Parses a subscribe response and extracts the server-side subscription id. +pub(crate) fn parse_subscription_id_response( + response_body: &str, +) -> core::result::Result { + let parse_result = parse_json_rpc_response::(response_body); + let parsed_response = match parse_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + if let Some(error_value) = parsed_response.error { + let error_text_result = serde_json::to_string(&error_value); + let error_text = match error_text_result { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Json { + context: "serialize websocket subscribe rpc error", + message: error.to_string(), + }); + } + }; + return Err(crate::KhbbError::Runtime { + context: "websocket subscribe returned rpc error", + message: error_text, + }); + } + match parsed_response.result { + Some(value) => Ok(value), + None => Err(crate::KhbbError::Runtime { + context: "websocket subscribe returned empty result", + message: response_body.to_string(), + }), + } +} + +/// Builds the JSON-RPC params array for `logsSubscribe`. +pub(crate) fn build_logs_subscribe_params( + filter: solana_rpc_client_api::config::RpcTransactionLogsFilter, + config: std::option::Option, +) -> core::result::Result { + if config.is_some() { + let to_value_result = serde_json::to_value((filter, config)); + match to_value_result { + Ok(value) => Ok(value), + Err(error) => Err(crate::KhbbError::Json { + context: "serialize websocket logsSubscribe params", + message: error.to_string(), + }), + } + } else { + let to_value_result = serde_json::to_value((filter,)); + match to_value_result { + Ok(value) => Ok(value), + Err(error) => Err(crate::KhbbError::Json { + context: "serialize websocket logsSubscribe params", + message: error.to_string(), + }), + } + } +} + +/// Builds the JSON-RPC params array for `programSubscribe`. +pub(crate) fn build_program_subscribe_params( + program_id: &str, + config: std::option::Option, +) -> core::result::Result { + if program_id.trim().is_empty() { + return Err(crate::KhbbError::Config { + message: std::string::String::from("programSubscribe program_id must not be empty"), + }); + } + if config.is_some() { + let to_value_result = serde_json::to_value((program_id, config)); + match to_value_result { + Ok(value) => Ok(value), + Err(error) => Err(crate::KhbbError::Json { + context: "serialize websocket programSubscribe params", + message: error.to_string(), + }), + } + } else { + let to_value_result = serde_json::to_value((program_id,)); + match to_value_result { + Ok(value) => Ok(value), + Err(error) => Err(crate::KhbbError::Json { + context: "serialize websocket programSubscribe params", + message: error.to_string(), + }), + } + } +} + +#[cfg(test)] +mod tests { + #[test] + fn new_rejects_empty_url() { + let config = super::KhbbSolanaWsRpcClientConfig { url: std::string::String::new() }; + let result = super::KhbbSolanaWsRpcClient::new(config); + assert!(result.is_err()); + } + + #[test] + fn new_accepts_valid_url() { + let config = super::KhbbSolanaWsRpcClientConfig { + url: std::string::String::from("wss://example.invalid"), + }; + let result = super::KhbbSolanaWsRpcClient::new(config); + assert!(result.is_ok()); + } + + #[test] + fn subscription_kind_method_names_are_stable() { + assert_eq!(super::KhbbWsSubscriptionKind::Slot.subscribe_method_name(), "slotSubscribe"); + assert_eq!( + super::KhbbWsSubscriptionKind::Slot.unsubscribe_method_name(), + "slotUnsubscribe" + ); + assert_eq!(super::KhbbWsSubscriptionKind::Logs.subscribe_method_name(), "logsSubscribe"); + assert_eq!( + super::KhbbWsSubscriptionKind::Program.subscribe_method_name(), + "programSubscribe" + ); + } + + #[test] + fn parse_subscription_id_response_accepts_success_payload() { + let body = r#"{ + "jsonrpc":"2.0", + "result":42, + "id":7 + }"#; + let result = super::parse_subscription_id_response(body); + assert!(result.is_ok()); + assert_eq!(result.expect("subscription id"), 42); + } + + #[test] + fn parse_subscription_id_response_rejects_error_payload() { + let body = r#"{ + "jsonrpc":"2.0", + "error":{"code":-32000,"message":"failure"}, + "id":7 + }"#; + let result = super::parse_subscription_id_response(body); + assert!(result.is_err()); + } + + #[test] + fn parse_slot_notification_accepts_valid_payload() { + let body = r#"{ + "jsonrpc":"2.0", + "method":"slotNotification", + "params":{ + "result":{ + "parent":75, + "root":44, + "slot":76 + }, + "subscription":3 + } + }"#; + let result = super::parse_slot_notification(body); + assert!(result.is_ok()); + let notification = result.expect("slot notification"); + assert_eq!(notification.method, "slotNotification"); + assert_eq!(notification.params.subscription, 3); + assert_eq!(notification.params.result.slot, 76); + } + + #[test] + fn build_logs_subscribe_params_accepts_filter_only() { + let result = super::build_logs_subscribe_params( + solana_rpc_client_api::config::RpcTransactionLogsFilter::All, + None, + ); + assert!(result.is_ok()); + let params = result.expect("logs params"); + assert!(params.is_array()); + } + + #[test] + fn build_program_subscribe_params_rejects_empty_program_id() { + let result = super::build_program_subscribe_params("", None); + assert!(result.is_err()); + } + + #[test] + fn build_program_subscribe_params_accepts_program_id_only() { + let result = + super::build_program_subscribe_params("11111111111111111111111111111111", None); + assert!(result.is_ok()); + let params = result.expect("program params"); + assert!(params.is_array()); + } +} diff --git a/khbb_lib/src/storage.rs b/khbb_lib/src/storage.rs index c4a07da..ff1e480 100644 --- a/khbb_lib/src/storage.rs +++ b/khbb_lib/src/storage.rs @@ -323,6 +323,38 @@ VALUES (?, ?, ?, ?, ?, ?, ?) } } +pub(crate) async fn insert_raw_ws_message( + pool: &sqlx::SqlitePool, + session_id: i64, + direction: &str, + message_text: &str, +) -> core::result::Result<(), crate::KhbbError> { + let now = chrono::Utc::now().to_rfc3339(); + let insert_result = sqlx::query( + r#" +INSERT INTO raw_ws_messages ( + listener_session_id, + direction, + message_text, + created_at +) VALUES (?1, ?2, ?3, ?4); +"#, + ) + .bind(session_id) + .bind(direction) + .bind(message_text) + .bind(now) + .execute(pool) + .await; + match insert_result { + Ok(_) => Ok(()), + Err(error) => Err(crate::KhbbError::Database { + context: "insert raw websocket message", + message: error.to_string(), + }), + } +} + #[cfg(test)] mod tests { use super::*; @@ -473,4 +505,26 @@ WHERE id = ?1; insert_raw_http_rpc_message(&pool, session.id, 1, "getSlot", "{}", "{}", "ok").await; assert!(result.is_ok()); } + + #[tokio::test] + async fn insert_raw_ws_message_inserts_row() { + let database_url = build_temp_sqlite_url(); + let pool_result = crate::create_sqlite_pool(&database_url).await; + assert!(pool_result.is_ok()); + let pool = pool_result.expect("create sqlite pool"); + let schema_result = crate::ensure_sqlite_schema(&pool).await; + assert!(schema_result.is_ok()); + let config = build_test_config(database_url); + let insert_session_result = super::insert_listener_session(&pool, &config).await; + assert!(insert_session_result.is_ok()); + let session = insert_session_result.expect("insert listener session"); + let insert_ws_result = super::insert_raw_ws_message( + &pool, + session.id, + "incoming", + r#"{"jsonrpc":"2.0","method":"slotNotification"}"#, + ) + .await; + assert!(insert_ws_result.is_ok()); + } }