This commit is contained in:
2026-04-18 19:22:31 +02:00
parent 17587bdde5
commit f8375723de
4 changed files with 323 additions and 72 deletions

View File

@@ -8,7 +8,7 @@ members = [
]
[workspace.package]
version = "0.4.5"
version = "0.4.6"
edition = "2024"
license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot"

View File

@@ -18,6 +18,7 @@ mod solana_rpc_http;
mod storage;
mod tracing_setup;
mod solana_rpc_ws;
mod ws_event;
/// Runs the listener application bootstrap workflow.
pub use crate::app::run_listener_app;
@@ -65,3 +66,11 @@ pub use crate::solana_rpc_ws::KhbbWsNotificationEnvelope;
pub use crate::solana_rpc_ws::KhbbWsNotificationParams;
/// Classified incoming WebSocket message.
pub use crate::solana_rpc_ws::KhbbWsIncomingMessage;
/// Normalized WebSocket event emitted by the listener runtime.
pub use crate::ws_event::KhbbWsNormalizedEvent;
/// Normalized slot notification event.
pub use crate::ws_event::KhbbWsSlotEvent;
/// Normalized logs notification event.
pub use crate::ws_event::KhbbWsLogsEvent;
/// Normalized program notification event.
pub use crate::ws_event::KhbbWsProgramEvent;

View File

@@ -14,7 +14,6 @@ fn find_active_subscription_by_id<'a>(
return Some(subscription);
}
}
None
}
@@ -390,85 +389,62 @@ pub async fn run_listener_runtime(
}
None => None,
};
match method.as_str() {
"slotNotification" => {
let parse_result =
crate::solana_rpc_ws::parse_slot_notification(&raw);
match parse_result {
Ok(notification) => {
let normalize_result = crate::ws_event::normalize_ws_notification(
&method,
&raw,
source_subscription,
);
match normalize_result {
Ok(Some(crate::KhbbWsNormalizedEvent::Slot(event))) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = notification.params.subscription,
source_kind = ?source_subscription.map(|value| value.kind),
source_label = ?source_subscription.and_then(|value| value.label.as_deref()),
slot = notification.params.result.slot,
parent = notification.params.result.parent,
root = notification.params.result.root,
"parsed slot notification"
subscription_id = event.subscription_id,
source_kind = ?event.source_kind,
source_label = ?event.source_label,
slot = event.slot,
parent = event.parent,
root = event.root,
"normalized slot event"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to parse slot notification"
);
}
}
}
"logsNotification" => {
let parse_result =
crate::solana_rpc_ws::parse_logs_notification(&raw);
match parse_result {
Ok(notification) => {
Ok(Some(crate::KhbbWsNormalizedEvent::Logs(event))) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = notification.params.subscription,
source_kind = ?source_subscription.map(|value| value.kind),
source_label = ?source_subscription.and_then(|value| value.label.as_deref()),
signature = %notification.params.result.value.signature,
"parsed logs notification"
subscription_id = event.subscription_id,
source_kind = ?event.source_kind,
source_label = ?event.source_label,
signature = %event.signature,
has_error = event.has_error,
context_slot = event.context_slot,
"normalized logs event"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to parse logs notification"
);
}
}
}
"programNotification" => {
let parse_result =
crate::solana_rpc_ws::parse_program_notification(&raw);
match parse_result {
Ok(notification) => {
Ok(Some(crate::KhbbWsNormalizedEvent::Program(event))) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = notification.params.subscription,
source_kind = ?source_subscription.map(|value| value.kind),
source_label = ?source_subscription.and_then(|value| value.label.as_deref()),
program_pubkey = %notification.params.result.value.pubkey,
"parsed program notification"
subscription_id = event.subscription_id,
source_kind = ?event.source_kind,
source_label = ?event.source_label,
pubkey = %event.pubkey,
context_slot = event.context_slot,
"normalized program event"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to parse program notification"
);
}
}
}
_ => {
Ok(None) => {
tracing::trace!(
listener_session_id = session.id,
method = %method,
"received unsupported websocket notification method"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
method = %method,
error = %error,
"failed to normalize websocket notification"
);
}
}
}
Ok(crate::KhbbWsIncomingMessage::Unknown { raw, .. }) => {

266
khbb_lib/src/ws_event.rs Normal file
View File

@@ -0,0 +1,266 @@
// file: khbb_lib/src/ws_event.rs
//! Normalized WebSocket events produced from Solana PubSub notifications.
/// Normalized WebSocket event emitted by the listener runtime.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum KhbbWsNormalizedEvent {
/// Normalized slot notification.
Slot(KhbbWsSlotEvent),
/// Normalized logs notification.
Logs(KhbbWsLogsEvent),
/// Normalized program notification.
Program(KhbbWsProgramEvent),
}
/// Normalized slot notification event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KhbbWsSlotEvent {
/// Subscription identifier.
pub subscription_id: u64,
/// Subscription kind.
pub source_kind: crate::KhbbWsSubscriptionKind,
/// Optional subscription label.
pub source_label: std::option::Option<std::string::String>,
/// Slot.
pub slot: u64,
/// Parent slot.
pub parent: u64,
/// Root slot.
pub root: u64,
}
/// Normalized logs notification event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KhbbWsLogsEvent {
/// Subscription identifier.
pub subscription_id: u64,
/// Subscription kind.
pub source_kind: crate::KhbbWsSubscriptionKind,
/// Optional subscription label.
pub source_label: std::option::Option<std::string::String>,
/// Transaction signature.
pub signature: std::string::String,
/// Whether the transaction errored.
pub has_error: bool,
/// Raw logs emitted by the transaction.
pub logs: std::vec::Vec<std::string::String>,
/// Context slot.
pub context_slot: u64,
}
/// Normalized program notification event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KhbbWsProgramEvent {
/// Subscription identifier.
pub subscription_id: u64,
/// Subscription kind.
pub source_kind: crate::KhbbWsSubscriptionKind,
/// Optional subscription label.
pub source_label: std::option::Option<std::string::String>,
/// Updated account pubkey.
pub pubkey: std::string::String,
/// Context slot.
pub context_slot: u64,
}
/// Normalizes a raw Solana PubSub notification into an internal event.
pub(crate) fn normalize_ws_notification(
method: &str,
raw: &str,
source_subscription: std::option::Option<&crate::domain::KhbbActiveWsSubscription>,
) -> core::result::Result<std::option::Option<KhbbWsNormalizedEvent>, crate::KhbbError> {
match method {
"slotNotification" => {
let parse_result = crate::solana_rpc_ws::parse_slot_notification(raw);
let notification = match parse_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let source_kind = match source_subscription {
Some(value) => value.kind,
None => crate::KhbbWsSubscriptionKind::Slot,
};
let source_label = match source_subscription {
Some(value) => value.label.clone(),
None => None,
};
Ok(Some(KhbbWsNormalizedEvent::Slot(KhbbWsSlotEvent {
subscription_id: notification.params.subscription,
source_kind,
source_label,
slot: notification.params.result.slot,
parent: notification.params.result.parent,
root: notification.params.result.root,
})))
},
"logsNotification" => {
let parse_result = crate::solana_rpc_ws::parse_logs_notification(raw);
let notification = match parse_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let source_kind = match source_subscription {
Some(value) => value.kind,
None => crate::KhbbWsSubscriptionKind::Logs,
};
let source_label = match source_subscription {
Some(value) => value.label.clone(),
None => None,
};
Ok(Some(KhbbWsNormalizedEvent::Logs(KhbbWsLogsEvent {
subscription_id: notification.params.subscription,
source_kind,
source_label,
signature: notification.params.result.value.signature,
has_error: notification.params.result.value.err.is_some(),
logs: notification.params.result.value.logs,
context_slot: notification.params.result.context.slot,
})))
},
"programNotification" => {
let parse_result = crate::solana_rpc_ws::parse_program_notification(raw);
let notification = match parse_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let source_kind = match source_subscription {
Some(value) => value.kind,
None => crate::KhbbWsSubscriptionKind::Program,
};
let source_label = match source_subscription {
Some(value) => value.label.clone(),
None => None,
};
Ok(Some(KhbbWsNormalizedEvent::Program(KhbbWsProgramEvent {
subscription_id: notification.params.subscription,
source_kind,
source_label,
pubkey: notification.params.result.value.pubkey,
context_slot: notification.params.result.context.slot,
})))
},
_ => Ok(None),
}
}
#[cfg(test)]
mod tests {
#[test]
fn normalize_slot_notification_returns_slot_event() {
let raw = r#"{
"jsonrpc":"2.0",
"method":"slotNotification",
"params":{
"subscription":7,
"result":{
"slot":100,
"parent":99,
"root":90
}
}
}"#;
let result = super::normalize_ws_notification("slotNotification", raw, None);
assert!(result.is_ok());
let event_option = result.expect("normalize slot");
assert!(event_option.is_some());
match event_option.expect("slot event") {
super::KhbbWsNormalizedEvent::Slot(event) => {
assert_eq!(event.subscription_id, 7);
assert_eq!(event.slot, 100);
assert_eq!(event.parent, 99);
assert_eq!(event.root, 90);
},
_ => {
panic!("expected slot event");
},
}
}
#[test]
fn normalize_logs_notification_returns_logs_event() {
let raw = r#"{
"jsonrpc":"2.0",
"method":"logsNotification",
"params":{
"subscription":8,
"result":{
"context":{"slot":123},
"value":{
"signature":"abc",
"err":null,
"logs":["log1","log2"]
}
}
}
}"#;
let result = super::normalize_ws_notification("logsNotification", raw, None);
assert!(result.is_ok());
let event_option = result.expect("normalize logs");
assert!(event_option.is_some());
match event_option.expect("logs event") {
super::KhbbWsNormalizedEvent::Logs(event) => {
assert_eq!(event.subscription_id, 8);
assert_eq!(event.signature, "abc");
assert!(!event.has_error);
assert_eq!(event.logs.len(), 2);
assert_eq!(event.context_slot, 123);
},
_ => {
panic!("expected logs event");
},
}
}
#[test]
fn normalize_program_notification_returns_program_event() {
let raw = r#"{
"jsonrpc":"2.0",
"method":"programNotification",
"params":{
"subscription":9,
"result":{
"context":{"slot":456},
"value":{
"pubkey":"TokenAccount1111111111111111111111111111111",
"account":{
"lamports":1,
"data":["","base64"],
"owner":"11111111111111111111111111111111",
"executable":false,
"rentEpoch":0,
"space":0
}
}
}
}
}"#;
let result = super::normalize_ws_notification("programNotification", raw, None);
assert!(result.is_ok());
let event_option = result.expect("normalize program");
assert!(event_option.is_some());
match event_option.expect("program event") {
super::KhbbWsNormalizedEvent::Program(event) => {
assert_eq!(event.subscription_id, 9);
assert_eq!(event.pubkey, "TokenAccount1111111111111111111111111111111");
assert_eq!(event.context_slot, 456);
},
_ => {
panic!("expected program event");
},
}
}
#[test]
fn normalize_unknown_method_returns_none() {
let result = super::normalize_ws_notification("unknownNotification", "{}", None);
assert!(result.is_ok());
assert!(result.expect("normalize unknown").is_none());
}
}