// file: kb_lib/src/solana_pubsub_ws.rs //! Typed Solana WebSocket PubSub helpers built on top of the generic JSON-RPC //! transport. //! //! This module keeps the low-level transport and registry logic in //! [`crate::WsClient`] unchanged while adding: //! - typed `*_typed()` subscribe helpers using `solana_rpc_client_api::config` //! - typed notification decoding using `solana_rpc_client_api::response` /// Typed Solana PubSub notification payload. #[derive(Clone, Debug)] pub enum SolanaWsTypedNotification { /// `accountNotification` Account(solana_rpc_client_api::response::Response), /// `blockNotification` Block(solana_rpc_client_api::response::RpcBlockUpdate), /// `logsNotification` Logs( solana_rpc_client_api::response::Response, ), /// `programNotification` Program( solana_rpc_client_api::response::Response, ), /// `rootNotification` Root(u64), /// `signatureNotification` Signature( solana_rpc_client_api::response::Response< solana_rpc_client_api::response::RpcSignatureResult, >, ), /// `slotNotification` Slot(solana_rpc_client_api::response::SlotInfo), /// `slotsUpdatesNotification` SlotsUpdates(solana_rpc_client_api::response::SlotUpdate), /// `voteNotification` Vote(solana_rpc_client_api::response::RpcVote), } /// Parses a Solana JSON-RPC notification into an official typed payload. pub fn parse_solana_ws_typed_notification( notification: &crate::JsonRpcWsNotification, ) -> Result { if notification.method == "accountNotification" { let parse_result = serde_json::from_value::< solana_rpc_client_api::response::Response, >(notification.params.result.clone()); return match parse_result { Ok(value) => Ok(SolanaWsTypedNotification::Account(value)), Err(error) => Err(crate::Error::Json(format!( "cannot parse accountNotification payload: {error}" ))), }; } if notification.method == "blockNotification" { let parse_result = serde_json::from_value::( notification.params.result.clone(), ); return match parse_result { Ok(value) => Ok(SolanaWsTypedNotification::Block(value)), Err(error) => { Err(crate::Error::Json(format!("cannot parse blockNotification payload: {error}"))) }, }; } if notification.method == "logsNotification" { let parse_result = serde_json::from_value::< solana_rpc_client_api::response::Response< solana_rpc_client_api::response::RpcLogsResponse, >, >(notification.params.result.clone()); return match parse_result { Ok(value) => Ok(SolanaWsTypedNotification::Logs(value)), Err(error) => { Err(crate::Error::Json(format!("cannot parse logsNotification payload: {error}"))) }, }; } if notification.method == "programNotification" { let parse_result = serde_json::from_value::< solana_rpc_client_api::response::Response< solana_rpc_client_api::response::RpcKeyedAccount, >, >(notification.params.result.clone()); return match parse_result { Ok(value) => Ok(SolanaWsTypedNotification::Program(value)), Err(error) => Err(crate::Error::Json(format!( "cannot parse programNotification payload: {error}" ))), }; } if notification.method == "rootNotification" { let root_option = notification.params.result.as_u64(); return match root_option { Some(root) => Ok(SolanaWsTypedNotification::Root(root)), None => Err(crate::Error::Json( "cannot parse rootNotification payload: result is not a u64".to_string(), )), }; } if notification.method == "signatureNotification" { let parse_result = serde_json::from_value::< solana_rpc_client_api::response::Response< solana_rpc_client_api::response::RpcSignatureResult, >, >(notification.params.result.clone()); return match parse_result { Ok(value) => Ok(SolanaWsTypedNotification::Signature(value)), Err(error) => Err(crate::Error::Json(format!( "cannot parse signatureNotification payload: {error}" ))), }; } if notification.method == "slotNotification" { let parse_result = serde_json::from_value::( notification.params.result.clone(), ); return match parse_result { Ok(value) => Ok(SolanaWsTypedNotification::Slot(value)), Err(error) => { Err(crate::Error::Json(format!("cannot parse slotNotification payload: {error}"))) }, }; } if notification.method == "slotsUpdatesNotification" { let parse_result = serde_json::from_value::( notification.params.result.clone(), ); return match parse_result { Ok(value) => Ok(SolanaWsTypedNotification::SlotsUpdates(value)), Err(error) => Err(crate::Error::Json(format!( "cannot parse slotsUpdatesNotification payload: {error}" ))), }; } if notification.method == "voteNotification" { let parse_result = serde_json::from_value::( notification.params.result.clone(), ); return match parse_result { Ok(value) => Ok(SolanaWsTypedNotification::Vote(value)), Err(error) => { Err(crate::Error::Json(format!("cannot parse voteNotification payload: {error}"))) }, }; } return Err(crate::Error::Json(format!( "unsupported Solana websocket notification method '{}'", notification.method ))); } /// Parses a typed Solana PubSub notification from a generic websocket event. /// /// This returns: /// - `Ok(Some(...))` for JSON-RPC notification-bearing events /// - `Ok(None)` for events that do not carry a notification /// - `Err(...)` when a notification is present but cannot be decoded pub fn parse_solana_ws_typed_notification_from_event( event: &crate::WsEvent, ) -> Result, crate::Error> { match event { crate::WsEvent::SubscriptionNotification { notification, .. } => { let parse_result = parse_solana_ws_typed_notification(notification); match parse_result { Ok(value) => return Ok(Some(value)), Err(error) => return Err(error), } }, crate::WsEvent::JsonRpcNotificationWithoutSubscription { notification, .. } => { let parse_result = parse_solana_ws_typed_notification(notification); match parse_result { Ok(value) => return Ok(Some(value)), Err(error) => return Err(error), } }, crate::WsEvent::JsonRpcMessage { message, .. } => match message { crate::JsonRpcWsIncomingMessage::Notification(notification) => { let parse_result = parse_solana_ws_typed_notification(notification); match parse_result { Ok(value) => return Ok(Some(value)), Err(error) => return Err(error), } }, crate::JsonRpcWsIncomingMessage::SuccessResponse(_) => return Ok(None), crate::JsonRpcWsIncomingMessage::ErrorResponse(_) => return Ok(None), }, _ => return Ok(None), } } impl crate::WsClient { /// Typed helper for `accountSubscribe`. pub async fn account_subscribe_typed( &self, pubkey: std::string::String, config: std::option::Option, ) -> Result { let config_value_result = serialize_optional_config_value(config, "accountSubscribe config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; return self.account_subscribe_raw(pubkey, config_value).await; } /// Typed helper for `blockSubscribe`. pub async fn block_subscribe_typed( &self, filter: solana_rpc_client_api::config::RpcBlockSubscribeFilter, config: std::option::Option, ) -> Result { let filter_value_result = serialize_required_value(filter, "blockSubscribe filter"); let filter_value = match filter_value_result { Ok(filter_value) => filter_value, Err(error) => return Err(error), }; let config_value_result = serialize_optional_config_value(config, "blockSubscribe config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; return self.block_subscribe_raw(filter_value, config_value).await; } /// Typed helper for `logsSubscribe`. pub async fn logs_subscribe_typed( &self, filter: solana_rpc_client_api::config::RpcTransactionLogsFilter, config: std::option::Option, ) -> Result { let filter_value_result = serialize_required_value(filter, "logsSubscribe filter"); let filter_value = match filter_value_result { Ok(filter_value) => filter_value, Err(error) => return Err(error), }; let config_value_result = serialize_optional_config_value(config, "logsSubscribe config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; return self.logs_subscribe_raw(filter_value, config_value).await; } /// Typed helper for `programSubscribe`. pub async fn program_subscribe_typed( &self, program_id: std::string::String, config: std::option::Option, ) -> Result { let config_value_result = serialize_optional_config_value(config, "programSubscribe config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; return self.program_subscribe_raw(program_id, config_value).await; } /// Typed helper for `signatureSubscribe`. pub async fn signature_subscribe_typed( &self, signature: std::string::String, config: std::option::Option, ) -> Result { let config_value_result = serialize_optional_config_value(config, "signatureSubscribe config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; return self.signature_subscribe_raw(signature, config_value).await; } } fn serialize_required_value(value: T, label: &str) -> Result where T: serde::Serialize, { let value_result = serde_json::to_value(value); match value_result { Ok(value) => return Ok(value), Err(error) => { return Err(crate::Error::Json(format!("cannot serialize {}: {error}", label))); }, } } fn serialize_optional_config_value( value: std::option::Option, label: &str, ) -> Result, crate::Error> where T: serde::Serialize, { match value { Some(value) => { let value_result = serde_json::to_value(value); match value_result { Ok(value) => return Ok(Some(value)), Err(error) => { return Err(crate::Error::Json(format!("cannot serialize {}: {error}", label))); }, } }, None => return Ok(None), } } #[cfg(test)] mod tests { fn make_test_ws_client() -> crate::WsClient { let endpoint = crate::WsEndpointConfig { name: "test_ws".to_string(), enabled: true, provider: "test".to_string(), url: "ws://127.0.0.1:65535".to_string(), api_key_env_var: None, roles: vec!["test".to_string()], max_subscriptions: 8, connect_timeout_ms: 1000, request_timeout_ms: 1000, unsubscribe_timeout_ms: 500, write_channel_capacity: 8, event_channel_capacity: 32, auto_reconnect: false, }; return crate::WsClient::new(endpoint).expect("client creation must succeed"); } #[test] fn parse_root_notification_works() { let notification = crate::JsonRpcWsNotification { jsonrpc: "2.0".to_string(), method: "rootNotification".to_string(), params: crate::JsonRpcWsNotificationParams { result: serde_json::Value::from(123u64), subscription: 7, }, }; let parsed = crate::parse_solana_ws_typed_notification(¬ification) .expect("typed root notification parse must succeed"); match parsed { crate::SolanaWsTypedNotification::Root(root) => { assert_eq!(root, 123); }, other => { panic!("unexpected parsed notification: {other:?}"); }, } } #[test] fn parse_slot_notification_works() { let notification = crate::JsonRpcWsNotification { jsonrpc: "2.0".to_string(), method: "slotNotification".to_string(), params: crate::JsonRpcWsNotificationParams { result: serde_json::json!({ "parent": 10, "root": 11, "slot": 12 }), subscription: 8, }, }; let parsed = crate::parse_solana_ws_typed_notification(¬ification) .expect("typed slot notification parse must succeed"); match parsed { crate::SolanaWsTypedNotification::Slot(slot_info) => { assert_eq!(slot_info.parent, 10); assert_eq!(slot_info.root, 11); assert_eq!(slot_info.slot, 12); }, other => { panic!("unexpected parsed notification: {other:?}"); }, } } #[tokio::test] async fn account_subscribe_typed_uses_same_send_path() { let client = make_test_ws_client(); let result = client.account_subscribe_typed(crate::SYSTEM_PROGRAM_ID.to_string(), None).await; match result { Err(crate::Error::NotConnected(_)) => {}, other => { panic!("unexpected result: {other:?}"); }, } } #[tokio::test] async fn signature_subscribe_typed_uses_same_send_path() { let client = make_test_ws_client(); let result = client .signature_subscribe_typed( "2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b".to_string(), None, ) .await; match result { Err(crate::Error::NotConnected(_)) => {}, other => { panic!("unexpected result: {other:?}"); }, } } }