This commit is contained in:
2026-04-18 19:33:23 +02:00
parent f8375723de
commit f011a4a7d3
4 changed files with 302 additions and 35 deletions

View File

@@ -8,7 +8,7 @@ members = [
]
[workspace.package]
version = "0.4.6"
version = "0.5.0"
edition = "2024"
license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot"

View File

@@ -0,0 +1,233 @@
// file: khbb_lib/src/domain_event.rs
//! Domain-level events derived from normalized WebSocket notifications.
/// Domain-level event derived from a normalized WebSocket event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum KhbbDomainEvent {
/// A slot advancement event.
SlotAdvanced(KhbbSlotAdvancedEvent),
/// A generic transaction logs activity event.
TransactionLogActivity(KhbbTransactionLogActivityEvent),
/// A token-program-related account activity event.
TokenProgramActivity(KhbbTokenProgramActivityEvent),
}
/// Domain event emitted when a new slot notification is received.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KhbbSlotAdvancedEvent {
/// Subscription identifier.
pub subscription_id: u64,
/// Source subscription kind.
pub source_kind: crate::KhbbWsSubscriptionKind,
/// Optional source label.
pub source_label: std::option::Option<std::string::String>,
/// Current slot.
pub slot: u64,
/// Parent slot.
pub parent: u64,
/// Current root slot.
pub root: u64,
}
/// Domain event emitted when a logs notification is received.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KhbbTransactionLogActivityEvent {
/// Subscription identifier.
pub subscription_id: u64,
/// Source subscription kind.
pub source_kind: crate::KhbbWsSubscriptionKind,
/// Optional source label.
pub source_label: std::option::Option<std::string::String>,
/// Transaction signature.
pub signature: std::string::String,
/// Whether the transaction errored.
pub has_error: bool,
/// Context slot.
pub context_slot: u64,
/// Number of log lines.
pub log_count: usize,
}
/// Domain event emitted when activity is observed on a token program.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KhbbTokenProgramActivityEvent {
/// Subscription identifier.
pub subscription_id: u64,
/// Source subscription kind.
pub source_kind: crate::KhbbWsSubscriptionKind,
/// Optional source label.
pub source_label: std::option::Option<std::string::String>,
/// Updated account pubkey.
pub pubkey: std::string::String,
/// Context slot.
pub context_slot: u64,
/// Token program family label.
pub token_program_family: std::string::String,
}
/// Derives a domain event from a normalized WebSocket event.
///
/// Not every normalized event necessarily maps to a domain event. Unsupported
/// or not-yet-interesting events return `Ok(None)`.
pub(crate) fn derive_domain_event_from_ws_event(
event: &crate::KhbbWsNormalizedEvent,
) -> core::result::Result<std::option::Option<KhbbDomainEvent>, crate::KhbbError> {
match event {
crate::KhbbWsNormalizedEvent::Slot(slot_event) => {
Ok(Some(KhbbDomainEvent::SlotAdvanced(KhbbSlotAdvancedEvent {
subscription_id: slot_event.subscription_id,
source_kind: slot_event.source_kind,
source_label: slot_event.source_label.clone(),
slot: slot_event.slot,
parent: slot_event.parent,
root: slot_event.root,
})))
}
crate::KhbbWsNormalizedEvent::Logs(logs_event) => {
Ok(Some(KhbbDomainEvent::TransactionLogActivity(
KhbbTransactionLogActivityEvent {
subscription_id: logs_event.subscription_id,
source_kind: logs_event.source_kind,
source_label: logs_event.source_label.clone(),
signature: logs_event.signature.clone(),
has_error: logs_event.has_error,
context_slot: logs_event.context_slot,
log_count: logs_event.logs.len(),
},
)))
}
crate::KhbbWsNormalizedEvent::Program(program_event) => {
let label_option = program_event.source_label.as_deref();
let token_program_family = match label_option {
Some("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") => {
Some(std::string::String::from("spl-token"))
}
Some("TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb") => {
Some(std::string::String::from("spl-token-2022"))
}
_ => None,
};
match token_program_family {
Some(family) => Ok(Some(KhbbDomainEvent::TokenProgramActivity(
KhbbTokenProgramActivityEvent {
subscription_id: program_event.subscription_id,
source_kind: program_event.source_kind,
source_label: program_event.source_label.clone(),
pubkey: program_event.pubkey.clone(),
context_slot: program_event.context_slot,
token_program_family: family,
},
))),
None => Ok(None),
}
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn derive_slot_event_returns_slot_advanced() {
let ws_event = crate::KhbbWsNormalizedEvent::Slot(crate::KhbbWsSlotEvent {
subscription_id: 1,
source_kind: crate::KhbbWsSubscriptionKind::Slot,
source_label: None,
slot: 100,
parent: 99,
root: 90,
});
let result = super::derive_domain_event_from_ws_event(&ws_event);
assert!(result.is_ok());
let domain_event_option = result.expect("derive slot domain event");
assert!(domain_event_option.is_some());
match domain_event_option.expect("slot domain event") {
super::KhbbDomainEvent::SlotAdvanced(event) => {
assert_eq!(event.subscription_id, 1);
assert_eq!(event.slot, 100);
assert_eq!(event.parent, 99);
assert_eq!(event.root, 90);
}
_ => {
panic!("expected slot advanced event");
}
}
}
#[test]
fn derive_logs_event_returns_transaction_log_activity() {
let ws_event = crate::KhbbWsNormalizedEvent::Logs(crate::KhbbWsLogsEvent {
subscription_id: 2,
source_kind: crate::KhbbWsSubscriptionKind::Logs,
source_label: None,
signature: std::string::String::from("sig-1"),
has_error: false,
logs: vec![
std::string::String::from("log-1"),
std::string::String::from("log-2"),
],
context_slot: 123,
});
let result = super::derive_domain_event_from_ws_event(&ws_event);
assert!(result.is_ok());
let domain_event_option = result.expect("derive logs domain event");
assert!(domain_event_option.is_some());
match domain_event_option.expect("logs domain event") {
super::KhbbDomainEvent::TransactionLogActivity(event) => {
assert_eq!(event.subscription_id, 2);
assert_eq!(event.signature, "sig-1");
assert!(!event.has_error);
assert_eq!(event.context_slot, 123);
assert_eq!(event.log_count, 2);
}
_ => {
panic!("expected transaction log activity event");
}
}
}
#[test]
fn derive_program_event_returns_token_program_activity_for_spl_token() {
let ws_event = crate::KhbbWsNormalizedEvent::Program(crate::KhbbWsProgramEvent {
subscription_id: 3,
source_kind: crate::KhbbWsSubscriptionKind::Program,
source_label: Some(std::string::String::from(
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
)),
pubkey: std::string::String::from("SomeTokenAccountPubkey"),
context_slot: 456,
});
let result = super::derive_domain_event_from_ws_event(&ws_event);
assert!(result.is_ok());
let domain_event_option = result.expect("derive program domain event");
assert!(domain_event_option.is_some());
match domain_event_option.expect("program domain event") {
super::KhbbDomainEvent::TokenProgramActivity(event) => {
assert_eq!(event.subscription_id, 3);
assert_eq!(event.pubkey, "SomeTokenAccountPubkey");
assert_eq!(event.context_slot, 456);
assert_eq!(event.token_program_family, "spl-token");
}
_ => {
panic!("expected token program activity event");
}
}
}
#[test]
fn derive_program_event_returns_none_for_unknown_program_family() {
let ws_event = crate::KhbbWsNormalizedEvent::Program(crate::KhbbWsProgramEvent {
subscription_id: 4,
source_kind: crate::KhbbWsSubscriptionKind::Program,
source_label: Some(std::string::String::from("UnknownProgram11111111111111111111111111111111")),
pubkey: std::string::String::from("SomePubkey"),
context_slot: 789,
});
let result = super::derive_domain_event_from_ws_event(&ws_event);
assert!(result.is_ok());
let domain_event_option = result.expect("derive unknown program domain event");
assert!(domain_event_option.is_none());
}
}

View File

@@ -19,6 +19,7 @@ mod storage;
mod tracing_setup;
mod solana_rpc_ws;
mod ws_event;
mod domain_event;
/// Runs the listener application bootstrap workflow.
pub use crate::app::run_listener_app;
@@ -74,3 +75,11 @@ pub use crate::ws_event::KhbbWsSlotEvent;
pub use crate::ws_event::KhbbWsLogsEvent;
/// Normalized program notification event.
pub use crate::ws_event::KhbbWsProgramEvent;
/// Domain-level event derived from a normalized WebSocket event.
pub use crate::domain_event::KhbbDomainEvent;
/// Domain event emitted when a new slot notification is received.
pub use crate::domain_event::KhbbSlotAdvancedEvent;
/// Domain event emitted when a logs notification is received.
pub use crate::domain_event::KhbbTransactionLogActivityEvent;
/// Domain event emitted when activity is observed on a token program.
pub use crate::domain_event::KhbbTokenProgramActivityEvent;

View File

@@ -395,40 +395,65 @@ pub async fn run_listener_runtime(
source_subscription,
);
match normalize_result {
Ok(Some(crate::KhbbWsNormalizedEvent::Slot(event))) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = event.subscription_id,
source_kind = ?event.source_kind,
source_label = ?event.source_label,
slot = event.slot,
parent = event.parent,
root = event.root,
"normalized slot event"
);
}
Ok(Some(crate::KhbbWsNormalizedEvent::Logs(event))) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = event.subscription_id,
source_kind = ?event.source_kind,
source_label = ?event.source_label,
signature = %event.signature,
has_error = event.has_error,
context_slot = event.context_slot,
"normalized logs event"
);
}
Ok(Some(crate::KhbbWsNormalizedEvent::Program(event))) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = event.subscription_id,
source_kind = ?event.source_kind,
source_label = ?event.source_label,
pubkey = %event.pubkey,
context_slot = event.context_slot,
"normalized program event"
);
Ok(Some(normalized_event)) => {
let domain_event_result =
crate::domain_event::derive_domain_event_from_ws_event(
&normalized_event,
);
match domain_event_result {
Ok(Some(crate::KhbbDomainEvent::SlotAdvanced(event))) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = event.subscription_id,
source_kind = ?event.source_kind,
source_label = ?event.source_label,
slot = event.slot,
parent = event.parent,
root = event.root,
"domain slot advanced event"
);
}
Ok(Some(crate::KhbbDomainEvent::TransactionLogActivity(event))) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = event.subscription_id,
source_kind = ?event.source_kind,
source_label = ?event.source_label,
signature = %event.signature,
has_error = event.has_error,
context_slot = event.context_slot,
log_count = event.log_count,
"domain transaction log activity event"
);
}
Ok(Some(crate::KhbbDomainEvent::TokenProgramActivity(event))) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = event.subscription_id,
source_kind = ?event.source_kind,
source_label = ?event.source_label,
pubkey = %event.pubkey,
context_slot = event.context_slot,
token_program_family = %event.token_program_family,
"domain token program activity event"
);
}
Ok(None) => {
tracing::trace!(
listener_session_id = session.id,
method = %method,
"normalized websocket event did not map to a domain event"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
method = %method,
error = %error,
"failed to derive domain event from normalized websocket event"
);
}
}
}
Ok(None) => {
tracing::trace!(