Files
khadhroony-bobot/khbb_lib/src/ws_event.rs
2026-04-18 19:22:31 +02:00

267 lines
9.4 KiB
Rust

// file: khbb_lib/src/ws_event.rs
//! Normalized WebSocket events produced from Solana PubSub notifications.
/// Normalized WebSocket event emitted by the listener runtime.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum KhbbWsNormalizedEvent {
/// Normalized slot notification.
Slot(KhbbWsSlotEvent),
/// Normalized logs notification.
Logs(KhbbWsLogsEvent),
/// Normalized program notification.
Program(KhbbWsProgramEvent),
}
/// Normalized slot notification event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KhbbWsSlotEvent {
/// Subscription identifier.
pub subscription_id: u64,
/// Subscription kind.
pub source_kind: crate::KhbbWsSubscriptionKind,
/// Optional subscription label.
pub source_label: std::option::Option<std::string::String>,
/// Slot.
pub slot: u64,
/// Parent slot.
pub parent: u64,
/// Root slot.
pub root: u64,
}
/// Normalized logs notification event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KhbbWsLogsEvent {
/// Subscription identifier.
pub subscription_id: u64,
/// Subscription kind.
pub source_kind: crate::KhbbWsSubscriptionKind,
/// Optional subscription label.
pub source_label: std::option::Option<std::string::String>,
/// Transaction signature.
pub signature: std::string::String,
/// Whether the transaction errored.
pub has_error: bool,
/// Raw logs emitted by the transaction.
pub logs: std::vec::Vec<std::string::String>,
/// Context slot.
pub context_slot: u64,
}
/// Normalized program notification event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KhbbWsProgramEvent {
/// Subscription identifier.
pub subscription_id: u64,
/// Subscription kind.
pub source_kind: crate::KhbbWsSubscriptionKind,
/// Optional subscription label.
pub source_label: std::option::Option<std::string::String>,
/// Updated account pubkey.
pub pubkey: std::string::String,
/// Context slot.
pub context_slot: u64,
}
/// Normalizes a raw Solana PubSub notification into an internal event.
pub(crate) fn normalize_ws_notification(
method: &str,
raw: &str,
source_subscription: std::option::Option<&crate::domain::KhbbActiveWsSubscription>,
) -> core::result::Result<std::option::Option<KhbbWsNormalizedEvent>, crate::KhbbError> {
match method {
"slotNotification" => {
let parse_result = crate::solana_rpc_ws::parse_slot_notification(raw);
let notification = match parse_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let source_kind = match source_subscription {
Some(value) => value.kind,
None => crate::KhbbWsSubscriptionKind::Slot,
};
let source_label = match source_subscription {
Some(value) => value.label.clone(),
None => None,
};
Ok(Some(KhbbWsNormalizedEvent::Slot(KhbbWsSlotEvent {
subscription_id: notification.params.subscription,
source_kind,
source_label,
slot: notification.params.result.slot,
parent: notification.params.result.parent,
root: notification.params.result.root,
})))
},
"logsNotification" => {
let parse_result = crate::solana_rpc_ws::parse_logs_notification(raw);
let notification = match parse_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let source_kind = match source_subscription {
Some(value) => value.kind,
None => crate::KhbbWsSubscriptionKind::Logs,
};
let source_label = match source_subscription {
Some(value) => value.label.clone(),
None => None,
};
Ok(Some(KhbbWsNormalizedEvent::Logs(KhbbWsLogsEvent {
subscription_id: notification.params.subscription,
source_kind,
source_label,
signature: notification.params.result.value.signature,
has_error: notification.params.result.value.err.is_some(),
logs: notification.params.result.value.logs,
context_slot: notification.params.result.context.slot,
})))
},
"programNotification" => {
let parse_result = crate::solana_rpc_ws::parse_program_notification(raw);
let notification = match parse_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let source_kind = match source_subscription {
Some(value) => value.kind,
None => crate::KhbbWsSubscriptionKind::Program,
};
let source_label = match source_subscription {
Some(value) => value.label.clone(),
None => None,
};
Ok(Some(KhbbWsNormalizedEvent::Program(KhbbWsProgramEvent {
subscription_id: notification.params.subscription,
source_kind,
source_label,
pubkey: notification.params.result.value.pubkey,
context_slot: notification.params.result.context.slot,
})))
},
_ => Ok(None),
}
}
#[cfg(test)]
mod tests {
#[test]
fn normalize_slot_notification_returns_slot_event() {
let raw = r#"{
"jsonrpc":"2.0",
"method":"slotNotification",
"params":{
"subscription":7,
"result":{
"slot":100,
"parent":99,
"root":90
}
}
}"#;
let result = super::normalize_ws_notification("slotNotification", raw, None);
assert!(result.is_ok());
let event_option = result.expect("normalize slot");
assert!(event_option.is_some());
match event_option.expect("slot event") {
super::KhbbWsNormalizedEvent::Slot(event) => {
assert_eq!(event.subscription_id, 7);
assert_eq!(event.slot, 100);
assert_eq!(event.parent, 99);
assert_eq!(event.root, 90);
},
_ => {
panic!("expected slot event");
},
}
}
#[test]
fn normalize_logs_notification_returns_logs_event() {
let raw = r#"{
"jsonrpc":"2.0",
"method":"logsNotification",
"params":{
"subscription":8,
"result":{
"context":{"slot":123},
"value":{
"signature":"abc",
"err":null,
"logs":["log1","log2"]
}
}
}
}"#;
let result = super::normalize_ws_notification("logsNotification", raw, None);
assert!(result.is_ok());
let event_option = result.expect("normalize logs");
assert!(event_option.is_some());
match event_option.expect("logs event") {
super::KhbbWsNormalizedEvent::Logs(event) => {
assert_eq!(event.subscription_id, 8);
assert_eq!(event.signature, "abc");
assert!(!event.has_error);
assert_eq!(event.logs.len(), 2);
assert_eq!(event.context_slot, 123);
},
_ => {
panic!("expected logs event");
},
}
}
#[test]
fn normalize_program_notification_returns_program_event() {
let raw = r#"{
"jsonrpc":"2.0",
"method":"programNotification",
"params":{
"subscription":9,
"result":{
"context":{"slot":456},
"value":{
"pubkey":"TokenAccount1111111111111111111111111111111",
"account":{
"lamports":1,
"data":["","base64"],
"owner":"11111111111111111111111111111111",
"executable":false,
"rentEpoch":0,
"space":0
}
}
}
}
}"#;
let result = super::normalize_ws_notification("programNotification", raw, None);
assert!(result.is_ok());
let event_option = result.expect("normalize program");
assert!(event_option.is_some());
match event_option.expect("program event") {
super::KhbbWsNormalizedEvent::Program(event) => {
assert_eq!(event.subscription_id, 9);
assert_eq!(event.pubkey, "TokenAccount1111111111111111111111111111111");
assert_eq!(event.context_slot, 456);
},
_ => {
panic!("expected program event");
},
}
}
#[test]
fn normalize_unknown_method_returns_none() {
let result = super::normalize_ws_notification("unknownNotification", "{}", None);
assert!(result.is_ok());
assert!(result.expect("normalize unknown").is_none());
}
}