diff --git a/CHANGELOG.md b/CHANGELOG.md index cae8333..b5dca17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,3 +7,4 @@ 0.2.0 - Couche JSON-RPC WS Solana 0.3.0 - Registre subscriptions / notifications 0.3.1 - Ajout de subscribe/unsubscribe hlpers à WsClient +0.3.2 - Ajout de notifications typés diff --git a/Cargo.toml b/Cargo.toml index 57980b2..9003083 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.3.1" +version = "0.3.2" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index ef5a69b..acb01da 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -16,6 +16,7 @@ mod rpc_ws; mod tracing; mod types; mod ws_client; +mod rpc_ws_solana; pub use crate::config::KbAppConfig; pub use crate::config::KbConfig; @@ -44,3 +45,7 @@ pub use crate::ws_client::WsClient; pub use crate::ws_client::WsEvent; pub use crate::ws_client::WsOutgoingMessage; pub use crate::ws_client::WsSubscriptionInfo; +pub use crate::rpc_ws_solana::KbSolanaWsTypedNotification; +pub use crate::rpc_ws_solana::parse_kb_solana_ws_typed_notification; +pub use crate::rpc_ws_solana::parse_kb_solana_ws_typed_notification_from_event; + diff --git a/kb_lib/src/rpc_ws_solana.rs b/kb_lib/src/rpc_ws_solana.rs new file mode 100644 index 0000000..b927813 --- /dev/null +++ b/kb_lib/src/rpc_ws_solana.rs @@ -0,0 +1,422 @@ +// file: kb_lib/src/rpc_ws_solana.rs + +//! Typed Solana WebSocket PubSub helpers built on top of the generic JSON-RPC +//! transport. +//! +//! This module keeps the low-level transport and registry logic in +//! [`crate::WsClient`] unchanged while adding: +//! - typed `*_typed()` subscribe helpers using `solana_rpc_client_api::config` +//! - typed notification decoding using `solana_rpc_client_api::response` + +/// Typed Solana PubSub notification payload. +#[derive(Clone, Debug)] +pub enum KbSolanaWsTypedNotification { + /// `accountNotification` + Account(solana_rpc_client_api::response::Response), + /// `blockNotification` + Block(solana_rpc_client_api::response::RpcBlockUpdate), + /// `logsNotification` + Logs( + solana_rpc_client_api::response::Response, + ), + /// `programNotification` + Program( + solana_rpc_client_api::response::Response, + ), + /// `rootNotification` + Root(u64), + /// `signatureNotification` + Signature( + solana_rpc_client_api::response::Response< + solana_rpc_client_api::response::RpcSignatureResult, + >, + ), + /// `slotNotification` + Slot(solana_rpc_client_api::response::SlotInfo), + /// `slotsUpdatesNotification` + SlotsUpdates(solana_rpc_client_api::response::SlotUpdate), + /// `voteNotification` + Vote(solana_rpc_client_api::response::RpcVote), +} + +/// Parses a Solana JSON-RPC notification into an official typed payload. +pub fn parse_kb_solana_ws_typed_notification( + notification: &crate::KbJsonRpcWsNotification, +) -> Result { + if notification.method == "accountNotification" { + let parse_result = serde_json::from_value::< + solana_rpc_client_api::response::Response, + >(notification.params.result.clone()); + return match parse_result { + Ok(value) => Ok(KbSolanaWsTypedNotification::Account(value)), + Err(error) => Err(crate::KbError::Json(format!( + "cannot parse accountNotification payload: {error}" + ))), + }; + } + if notification.method == "blockNotification" { + let parse_result = serde_json::from_value::( + notification.params.result.clone(), + ); + return match parse_result { + Ok(value) => Ok(KbSolanaWsTypedNotification::Block(value)), + Err(error) => Err(crate::KbError::Json(format!( + "cannot parse blockNotification payload: {error}" + ))), + }; + } + if notification.method == "logsNotification" { + let parse_result = serde_json::from_value::< + solana_rpc_client_api::response::Response< + solana_rpc_client_api::response::RpcLogsResponse, + >, + >(notification.params.result.clone()); + return match parse_result { + Ok(value) => Ok(KbSolanaWsTypedNotification::Logs(value)), + Err(error) => Err(crate::KbError::Json(format!( + "cannot parse logsNotification payload: {error}" + ))), + }; + } + if notification.method == "programNotification" { + let parse_result = serde_json::from_value::< + solana_rpc_client_api::response::Response< + solana_rpc_client_api::response::RpcKeyedAccount, + >, + >(notification.params.result.clone()); + return match parse_result { + Ok(value) => Ok(KbSolanaWsTypedNotification::Program(value)), + Err(error) => Err(crate::KbError::Json(format!( + "cannot parse programNotification payload: {error}" + ))), + }; + } + if notification.method == "rootNotification" { + let root_option = notification.params.result.as_u64(); + return match root_option { + Some(root) => Ok(KbSolanaWsTypedNotification::Root(root)), + None => Err(crate::KbError::Json( + "cannot parse rootNotification payload: result is not a u64".to_string(), + )), + }; + } + if notification.method == "signatureNotification" { + let parse_result = serde_json::from_value::< + solana_rpc_client_api::response::Response< + solana_rpc_client_api::response::RpcSignatureResult, + >, + >(notification.params.result.clone()); + return match parse_result { + Ok(value) => Ok(KbSolanaWsTypedNotification::Signature(value)), + Err(error) => Err(crate::KbError::Json(format!( + "cannot parse signatureNotification payload: {error}" + ))), + }; + } + if notification.method == "slotNotification" { + let parse_result = serde_json::from_value::( + notification.params.result.clone(), + ); + return match parse_result { + Ok(value) => Ok(KbSolanaWsTypedNotification::Slot(value)), + Err(error) => Err(crate::KbError::Json(format!( + "cannot parse slotNotification payload: {error}" + ))), + }; + } + if notification.method == "slotsUpdatesNotification" { + let parse_result = serde_json::from_value::( + notification.params.result.clone(), + ); + return match parse_result { + Ok(value) => Ok(KbSolanaWsTypedNotification::SlotsUpdates(value)), + Err(error) => Err(crate::KbError::Json(format!( + "cannot parse slotsUpdatesNotification payload: {error}" + ))), + }; + } + if notification.method == "voteNotification" { + let parse_result = serde_json::from_value::( + notification.params.result.clone(), + ); + return match parse_result { + Ok(value) => Ok(KbSolanaWsTypedNotification::Vote(value)), + Err(error) => Err(crate::KbError::Json(format!( + "cannot parse voteNotification payload: {error}" + ))), + }; + } + Err(crate::KbError::Json(format!( + "unsupported Solana websocket notification method '{}'", + notification.method + ))) +} + +/// Parses a typed Solana PubSub notification from a generic websocket event. +/// +/// This returns: +/// - `Ok(Some(...))` for JSON-RPC notification-bearing events +/// - `Ok(None)` for events that do not carry a notification +/// - `Err(...)` when a notification is present but cannot be decoded +pub fn parse_kb_solana_ws_typed_notification_from_event( + event: &crate::WsEvent, +) -> Result, crate::KbError> { + match event { + crate::WsEvent::SubscriptionNotification { notification, .. } => { + let parse_result = parse_kb_solana_ws_typed_notification(notification); + match parse_result { + Ok(value) => Ok(Some(value)), + Err(error) => Err(error), + } + } + crate::WsEvent::JsonRpcNotificationWithoutSubscription { notification, .. } => { + let parse_result = parse_kb_solana_ws_typed_notification(notification); + match parse_result { + Ok(value) => Ok(Some(value)), + Err(error) => Err(error), + } + } + crate::WsEvent::JsonRpcMessage { message, .. } => match message { + crate::KbJsonRpcWsIncomingMessage::Notification(notification) => { + let parse_result = parse_kb_solana_ws_typed_notification(notification); + match parse_result { + Ok(value) => Ok(Some(value)), + Err(error) => Err(error), + } + } + crate::KbJsonRpcWsIncomingMessage::SuccessResponse(_) => Ok(None), + crate::KbJsonRpcWsIncomingMessage::ErrorResponse(_) => Ok(None), + }, + _ => Ok(None), + } +} + +impl crate::WsClient { + /// Typed helper for `accountSubscribe`. + pub async fn account_subscribe_typed( + &self, + pubkey: std::string::String, + config: std::option::Option, + ) -> Result { + let config_value_result = + kb_serialize_optional_config_value(config, "accountSubscribe config"); + let config_value = match config_value_result { + Ok(config_value) => config_value, + Err(error) => return Err(error), + }; + self.account_subscribe(pubkey, config_value).await + } + + /// Typed helper for `blockSubscribe`. + pub async fn block_subscribe_typed( + &self, + filter: solana_rpc_client_api::config::RpcBlockSubscribeFilter, + config: std::option::Option, + ) -> Result { + let filter_value_result = kb_serialize_required_value(filter, "blockSubscribe filter"); + let filter_value = match filter_value_result { + Ok(filter_value) => filter_value, + Err(error) => return Err(error), + }; + let config_value_result = + kb_serialize_optional_config_value(config, "blockSubscribe config"); + let config_value = match config_value_result { + Ok(config_value) => config_value, + Err(error) => return Err(error), + }; + self.block_subscribe(filter_value, config_value).await + } + + /// Typed helper for `logsSubscribe`. + pub async fn logs_subscribe_typed( + &self, + filter: solana_rpc_client_api::config::RpcTransactionLogsFilter, + config: std::option::Option, + ) -> Result { + let filter_value_result = kb_serialize_required_value(filter, "logsSubscribe filter"); + let filter_value = match filter_value_result { + Ok(filter_value) => filter_value, + Err(error) => return Err(error), + }; + let config_value_result = + kb_serialize_optional_config_value(config, "logsSubscribe config"); + let config_value = match config_value_result { + Ok(config_value) => config_value, + Err(error) => return Err(error), + }; + self.logs_subscribe(filter_value, config_value).await + } + + /// Typed helper for `programSubscribe`. + pub async fn program_subscribe_typed( + &self, + program_id: std::string::String, + config: std::option::Option, + ) -> Result { + let config_value_result = + kb_serialize_optional_config_value(config, "programSubscribe config"); + let config_value = match config_value_result { + Ok(config_value) => config_value, + Err(error) => return Err(error), + }; + self.program_subscribe(program_id, config_value).await + } + + /// Typed helper for `signatureSubscribe`. + pub async fn signature_subscribe_typed( + &self, + signature: std::string::String, + config: std::option::Option, + ) -> Result { + let config_value_result = + kb_serialize_optional_config_value(config, "signatureSubscribe config"); + let config_value = match config_value_result { + Ok(config_value) => config_value, + Err(error) => return Err(error), + }; + self.signature_subscribe(signature, config_value).await + } +} + +fn kb_serialize_required_value( + value: T, + label: &str, +) -> Result +where + T: serde::Serialize, +{ + let value_result = serde_json::to_value(value); + match value_result { + Ok(value) => Ok(value), + Err(error) => Err(crate::KbError::Json(format!( + "cannot serialize {}: {error}", + label + ))), + } +} + +fn kb_serialize_optional_config_value( + value: std::option::Option, + label: &str, +) -> Result, crate::KbError> +where + T: serde::Serialize, +{ + match value { + Some(value) => { + let value_result = serde_json::to_value(value); + match value_result { + Ok(value) => Ok(Some(value)), + Err(error) => Err(crate::KbError::Json(format!( + "cannot serialize {}: {error}", + label + ))), + } + } + None => Ok(None), + } +} + +#[cfg(test)] +mod tests { + fn make_test_ws_client() -> crate::WsClient { + let endpoint = crate::KbWsEndpointConfig { + name: "test_ws".to_string(), + enabled: true, + provider: "test".to_string(), + url: "ws://127.0.0.1:65535".to_string(), + api_key_env_var: None, + roles: vec!["test".to_string()], + max_subscriptions: 8, + connect_timeout_ms: 1000, + request_timeout_ms: 1000, + unsubscribe_timeout_ms: 500, + write_channel_capacity: 8, + event_channel_capacity: 32, + auto_reconnect: false, + }; + crate::WsClient::new(endpoint).expect("client creation must succeed") + } + + #[test] + fn parse_root_notification_works() { + let notification = crate::KbJsonRpcWsNotification { + jsonrpc: "2.0".to_string(), + method: "rootNotification".to_string(), + params: crate::KbJsonRpcWsNotificationParams { + result: serde_json::Value::from(123u64), + subscription: 7, + }, + }; + let parsed = crate::parse_kb_solana_ws_typed_notification(¬ification) + .expect("typed root notification parse must succeed"); + match parsed { + crate::KbSolanaWsTypedNotification::Root(root) => { + assert_eq!(root, 123); + } + other => { + panic!("unexpected parsed notification: {other:?}"); + } + } + } + + #[test] + fn parse_slot_notification_works() { + let notification = crate::KbJsonRpcWsNotification { + jsonrpc: "2.0".to_string(), + method: "slotNotification".to_string(), + params: crate::KbJsonRpcWsNotificationParams { + result: serde_json::json!({ + "parent": 10, + "root": 11, + "slot": 12 + }), + subscription: 8, + }, + }; + + let parsed = crate::parse_kb_solana_ws_typed_notification(¬ification) + .expect("typed slot notification parse must succeed"); + match parsed { + crate::KbSolanaWsTypedNotification::Slot(slot_info) => { + assert_eq!(slot_info.parent, 10); + assert_eq!(slot_info.root, 11); + assert_eq!(slot_info.slot, 12); + } + other => { + panic!("unexpected parsed notification: {other:?}"); + } + } + } + + #[tokio::test] + async fn account_subscribe_typed_uses_same_send_path() { + let client = make_test_ws_client(); + let result = client + .account_subscribe_typed("11111111111111111111111111111111".to_string(), None) + .await; + match result { + Err(crate::KbError::NotConnected(_)) => {} + other => { + panic!("unexpected result: {other:?}"); + } + } + } + + #[tokio::test] + async fn signature_subscribe_typed_uses_same_send_path() { + let client = make_test_ws_client(); + let result = client + .signature_subscribe_typed( + "2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b".to_string(), + None, + ) + .await; + match result { + Err(crate::KbError::NotConnected(_)) => {} + other => { + panic!("unexpected result: {other:?}"); + } + } + } +}