diff --git a/Cargo.toml b/Cargo.toml index a68f8e2..f09b01b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.4.5" +version = "0.4.6" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot" diff --git a/khbb_lib/src/lib.rs b/khbb_lib/src/lib.rs index 793ac9d..4a73559 100644 --- a/khbb_lib/src/lib.rs +++ b/khbb_lib/src/lib.rs @@ -18,6 +18,7 @@ mod solana_rpc_http; mod storage; mod tracing_setup; mod solana_rpc_ws; +mod ws_event; /// Runs the listener application bootstrap workflow. pub use crate::app::run_listener_app; @@ -65,3 +66,11 @@ pub use crate::solana_rpc_ws::KhbbWsNotificationEnvelope; pub use crate::solana_rpc_ws::KhbbWsNotificationParams; /// Classified incoming WebSocket message. pub use crate::solana_rpc_ws::KhbbWsIncomingMessage; +/// Normalized WebSocket event emitted by the listener runtime. +pub use crate::ws_event::KhbbWsNormalizedEvent; +/// Normalized slot notification event. +pub use crate::ws_event::KhbbWsSlotEvent; +/// Normalized logs notification event. +pub use crate::ws_event::KhbbWsLogsEvent; +/// Normalized program notification event. +pub use crate::ws_event::KhbbWsProgramEvent; diff --git a/khbb_lib/src/listener.rs b/khbb_lib/src/listener.rs index fd3d381..efbc11a 100644 --- a/khbb_lib/src/listener.rs +++ b/khbb_lib/src/listener.rs @@ -14,7 +14,6 @@ fn find_active_subscription_by_id<'a>( return Some(subscription); } } - None } @@ -390,85 +389,62 @@ pub async fn run_listener_runtime( } None => None, }; - match method.as_str() { - "slotNotification" => { - let parse_result = - crate::solana_rpc_ws::parse_slot_notification(&raw); - match parse_result { - Ok(notification) => { - tracing::trace!( - listener_session_id = session.id, - subscription_id = notification.params.subscription, - source_kind = ?source_subscription.map(|value| value.kind), - source_label = ?source_subscription.and_then(|value| value.label.as_deref()), - slot = notification.params.result.slot, - parent = notification.params.result.parent, - root = notification.params.result.root, - "parsed slot notification" - ); - } - Err(error) => { - tracing::error!( - listener_session_id = session.id, - error = %error, - "failed to parse slot notification" - ); - } - } + let normalize_result = crate::ws_event::normalize_ws_notification( + &method, + &raw, + source_subscription, + ); + match normalize_result { + Ok(Some(crate::KhbbWsNormalizedEvent::Slot(event))) => { + tracing::trace!( + listener_session_id = session.id, + subscription_id = event.subscription_id, + source_kind = ?event.source_kind, + source_label = ?event.source_label, + slot = event.slot, + parent = event.parent, + root = event.root, + "normalized slot event" + ); } - "logsNotification" => { - let parse_result = - crate::solana_rpc_ws::parse_logs_notification(&raw); - match parse_result { - Ok(notification) => { - tracing::trace!( - listener_session_id = session.id, - subscription_id = notification.params.subscription, - source_kind = ?source_subscription.map(|value| value.kind), - source_label = ?source_subscription.and_then(|value| value.label.as_deref()), - signature = %notification.params.result.value.signature, - "parsed logs notification" - ); - } - Err(error) => { - tracing::error!( - listener_session_id = session.id, - error = %error, - "failed to parse logs notification" - ); - } - } + Ok(Some(crate::KhbbWsNormalizedEvent::Logs(event))) => { + tracing::trace!( + listener_session_id = session.id, + subscription_id = event.subscription_id, + source_kind = ?event.source_kind, + source_label = ?event.source_label, + signature = %event.signature, + has_error = event.has_error, + context_slot = event.context_slot, + "normalized logs event" + ); } - "programNotification" => { - let parse_result = - crate::solana_rpc_ws::parse_program_notification(&raw); - match parse_result { - Ok(notification) => { - tracing::trace!( - listener_session_id = session.id, - subscription_id = notification.params.subscription, - source_kind = ?source_subscription.map(|value| value.kind), - source_label = ?source_subscription.and_then(|value| value.label.as_deref()), - program_pubkey = %notification.params.result.value.pubkey, - "parsed program notification" - ); - } - Err(error) => { - tracing::error!( - listener_session_id = session.id, - error = %error, - "failed to parse program notification" - ); - } - } + Ok(Some(crate::KhbbWsNormalizedEvent::Program(event))) => { + tracing::trace!( + listener_session_id = session.id, + subscription_id = event.subscription_id, + source_kind = ?event.source_kind, + source_label = ?event.source_label, + pubkey = %event.pubkey, + context_slot = event.context_slot, + "normalized program event" + ); } - _ => { + Ok(None) => { tracing::trace!( listener_session_id = session.id, method = %method, "received unsupported websocket notification method" ); } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + method = %method, + error = %error, + "failed to normalize websocket notification" + ); + } } } Ok(crate::KhbbWsIncomingMessage::Unknown { raw, .. }) => { diff --git a/khbb_lib/src/ws_event.rs b/khbb_lib/src/ws_event.rs new file mode 100644 index 0000000..36d6910 --- /dev/null +++ b/khbb_lib/src/ws_event.rs @@ -0,0 +1,266 @@ +// file: khbb_lib/src/ws_event.rs + +//! Normalized WebSocket events produced from Solana PubSub notifications. + +/// Normalized WebSocket event emitted by the listener runtime. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum KhbbWsNormalizedEvent { + /// Normalized slot notification. + Slot(KhbbWsSlotEvent), + /// Normalized logs notification. + Logs(KhbbWsLogsEvent), + /// Normalized program notification. + Program(KhbbWsProgramEvent), +} + +/// Normalized slot notification event. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KhbbWsSlotEvent { + /// Subscription identifier. + pub subscription_id: u64, + /// Subscription kind. + pub source_kind: crate::KhbbWsSubscriptionKind, + /// Optional subscription label. + pub source_label: std::option::Option, + /// Slot. + pub slot: u64, + /// Parent slot. + pub parent: u64, + /// Root slot. + pub root: u64, +} + +/// Normalized logs notification event. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KhbbWsLogsEvent { + /// Subscription identifier. + pub subscription_id: u64, + /// Subscription kind. + pub source_kind: crate::KhbbWsSubscriptionKind, + /// Optional subscription label. + pub source_label: std::option::Option, + /// Transaction signature. + pub signature: std::string::String, + /// Whether the transaction errored. + pub has_error: bool, + /// Raw logs emitted by the transaction. + pub logs: std::vec::Vec, + /// Context slot. + pub context_slot: u64, +} + +/// Normalized program notification event. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KhbbWsProgramEvent { + /// Subscription identifier. + pub subscription_id: u64, + /// Subscription kind. + pub source_kind: crate::KhbbWsSubscriptionKind, + /// Optional subscription label. + pub source_label: std::option::Option, + /// Updated account pubkey. + pub pubkey: std::string::String, + /// Context slot. + pub context_slot: u64, +} + +/// Normalizes a raw Solana PubSub notification into an internal event. +pub(crate) fn normalize_ws_notification( + method: &str, + raw: &str, + source_subscription: std::option::Option<&crate::domain::KhbbActiveWsSubscription>, +) -> core::result::Result, crate::KhbbError> { + match method { + "slotNotification" => { + let parse_result = crate::solana_rpc_ws::parse_slot_notification(raw); + let notification = match parse_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let source_kind = match source_subscription { + Some(value) => value.kind, + None => crate::KhbbWsSubscriptionKind::Slot, + }; + let source_label = match source_subscription { + Some(value) => value.label.clone(), + None => None, + }; + Ok(Some(KhbbWsNormalizedEvent::Slot(KhbbWsSlotEvent { + subscription_id: notification.params.subscription, + source_kind, + source_label, + slot: notification.params.result.slot, + parent: notification.params.result.parent, + root: notification.params.result.root, + }))) + }, + "logsNotification" => { + let parse_result = crate::solana_rpc_ws::parse_logs_notification(raw); + let notification = match parse_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let source_kind = match source_subscription { + Some(value) => value.kind, + None => crate::KhbbWsSubscriptionKind::Logs, + }; + let source_label = match source_subscription { + Some(value) => value.label.clone(), + None => None, + }; + Ok(Some(KhbbWsNormalizedEvent::Logs(KhbbWsLogsEvent { + subscription_id: notification.params.subscription, + source_kind, + source_label, + signature: notification.params.result.value.signature, + has_error: notification.params.result.value.err.is_some(), + logs: notification.params.result.value.logs, + context_slot: notification.params.result.context.slot, + }))) + }, + "programNotification" => { + let parse_result = crate::solana_rpc_ws::parse_program_notification(raw); + let notification = match parse_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let source_kind = match source_subscription { + Some(value) => value.kind, + None => crate::KhbbWsSubscriptionKind::Program, + }; + let source_label = match source_subscription { + Some(value) => value.label.clone(), + None => None, + }; + Ok(Some(KhbbWsNormalizedEvent::Program(KhbbWsProgramEvent { + subscription_id: notification.params.subscription, + source_kind, + source_label, + pubkey: notification.params.result.value.pubkey, + context_slot: notification.params.result.context.slot, + }))) + }, + _ => Ok(None), + } +} + +#[cfg(test)] +mod tests { + #[test] + fn normalize_slot_notification_returns_slot_event() { + let raw = r#"{ + "jsonrpc":"2.0", + "method":"slotNotification", + "params":{ + "subscription":7, + "result":{ + "slot":100, + "parent":99, + "root":90 + } + } + }"#; + let result = super::normalize_ws_notification("slotNotification", raw, None); + assert!(result.is_ok()); + let event_option = result.expect("normalize slot"); + assert!(event_option.is_some()); + match event_option.expect("slot event") { + super::KhbbWsNormalizedEvent::Slot(event) => { + assert_eq!(event.subscription_id, 7); + assert_eq!(event.slot, 100); + assert_eq!(event.parent, 99); + assert_eq!(event.root, 90); + }, + _ => { + panic!("expected slot event"); + }, + } + } + + #[test] + fn normalize_logs_notification_returns_logs_event() { + let raw = r#"{ + "jsonrpc":"2.0", + "method":"logsNotification", + "params":{ + "subscription":8, + "result":{ + "context":{"slot":123}, + "value":{ + "signature":"abc", + "err":null, + "logs":["log1","log2"] + } + } + } + }"#; + let result = super::normalize_ws_notification("logsNotification", raw, None); + assert!(result.is_ok()); + let event_option = result.expect("normalize logs"); + assert!(event_option.is_some()); + match event_option.expect("logs event") { + super::KhbbWsNormalizedEvent::Logs(event) => { + assert_eq!(event.subscription_id, 8); + assert_eq!(event.signature, "abc"); + assert!(!event.has_error); + assert_eq!(event.logs.len(), 2); + assert_eq!(event.context_slot, 123); + }, + _ => { + panic!("expected logs event"); + }, + } + } + + #[test] + fn normalize_program_notification_returns_program_event() { + let raw = r#"{ + "jsonrpc":"2.0", + "method":"programNotification", + "params":{ + "subscription":9, + "result":{ + "context":{"slot":456}, + "value":{ + "pubkey":"TokenAccount1111111111111111111111111111111", + "account":{ + "lamports":1, + "data":["","base64"], + "owner":"11111111111111111111111111111111", + "executable":false, + "rentEpoch":0, + "space":0 + } + } + } + } + }"#; + let result = super::normalize_ws_notification("programNotification", raw, None); + assert!(result.is_ok()); + let event_option = result.expect("normalize program"); + assert!(event_option.is_some()); + match event_option.expect("program event") { + super::KhbbWsNormalizedEvent::Program(event) => { + assert_eq!(event.subscription_id, 9); + assert_eq!(event.pubkey, "TokenAccount1111111111111111111111111111111"); + assert_eq!(event.context_slot, 456); + }, + _ => { + panic!("expected program event"); + }, + } + } + + #[test] + fn normalize_unknown_method_returns_none() { + let result = super::normalize_ws_notification("unknownNotification", "{}", None); + assert!(result.is_ok()); + assert!(result.expect("normalize unknown").is_none()); + } +}