diff --git a/Cargo.toml b/Cargo.toml index 02d1959..a68f8e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.4.4" +version = "0.4.5" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot" diff --git a/khbb_lib/src/domain.rs b/khbb_lib/src/domain.rs index 6097273..de3160f 100644 --- a/khbb_lib/src/domain.rs +++ b/khbb_lib/src/domain.rs @@ -35,3 +35,18 @@ pub(crate) struct KhbbTrackedToken { /// UTC creation timestamp in RFC3339 format. pub created_at: std::string::String, } + +/// Active WebSocket subscription tracked by the listener runtime. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub(crate) struct KhbbActiveWsSubscription { + /// Client-side request identifier used to create the subscription. + pub request_id: u64, + /// Server-side subscription identifier returned by the RPC node. + pub subscription_id: u64, + /// Subscription kind. + pub kind: crate::KhbbWsSubscriptionKind, + /// Optional label associated with the subscription. + /// + /// For program subscriptions, this stores the subscribed program id. + pub label: std::option::Option, +} diff --git a/khbb_lib/src/listener.rs b/khbb_lib/src/listener.rs index 6322522..fd3d381 100644 --- a/khbb_lib/src/listener.rs +++ b/khbb_lib/src/listener.rs @@ -5,6 +5,19 @@ //! This module does not yet connect to Solana RPC, WebSocket or gRPC streams. //! It prepares the runtime structure, persistence hooks and shutdown behavior. +fn find_active_subscription_by_id<'a>( + subscriptions: &'a [crate::domain::KhbbActiveWsSubscription], + subscription_id: u64, +) -> std::option::Option<&'a crate::domain::KhbbActiveWsSubscription> { + for subscription in subscriptions { + if subscription.subscription_id == subscription_id { + return Some(subscription); + } + } + + None +} + /// Runs the current listener runtime skeleton. pub async fn run_listener_runtime( pool: &sqlx::SqlitePool, @@ -59,7 +72,8 @@ pub async fn run_listener_runtime( return Err(error); }, } - let mut ws_subscription_handles = std::vec::Vec::::new(); + let mut active_ws_subscriptions = + std::vec::Vec::::new(); if config.enable_ws_slot_subscribe { let slot_subscribe_result = ws_client.slot_subscribe(1).await; let slot_subscribe_output = match slot_subscribe_result { @@ -113,7 +127,12 @@ pub async fn run_listener_runtime( subscription_id = slot_subscription_handle.subscription_id, "slot websocket subscription established" ); - ws_subscription_handles.push(slot_subscription_handle); + active_ws_subscriptions.push(crate::domain::KhbbActiveWsSubscription { + request_id: slot_subscription_handle.request_id, + subscription_id: slot_subscription_handle.subscription_id, + kind: slot_subscription_handle.kind, + label: None, + }); } if config.enable_ws_logs_subscribe { let logs_subscribe_result = ws_client @@ -170,7 +189,12 @@ pub async fn run_listener_runtime( subscription_id = logs_subscription_handle.subscription_id, "logs websocket subscription established" ); - ws_subscription_handles.push(logs_subscription_handle); + active_ws_subscriptions.push(crate::domain::KhbbActiveWsSubscription { + request_id: logs_subscription_handle.request_id, + subscription_id: logs_subscription_handle.subscription_id, + kind: logs_subscription_handle.kind, + label: None, + }); } if config.enable_ws_program_subscribe { let mut program_request_id: u64 = 10; @@ -231,7 +255,12 @@ pub async fn run_listener_runtime( program_id = %program_id, "program websocket subscription established" ); - ws_subscription_handles.push(program_subscription_handle); + active_ws_subscriptions.push(crate::domain::KhbbActiveWsSubscription { + request_id: program_subscription_handle.request_id, + subscription_id: program_subscription_handle.subscription_id, + kind: program_subscription_handle.kind, + label: Some(program_id.clone()), + }); program_request_id = program_request_id.saturating_add(1); } } @@ -341,6 +370,26 @@ pub async fn run_listener_runtime( ); } } + let raw_json_result = + serde_json::from_str::(&raw); + let source_subscription_id = match raw_json_result { + Ok(json_value) => { + json_value + .get("params") + .and_then(|value| value.get("subscription")) + .and_then(serde_json::Value::as_u64) + } + Err(_) => None, + }; + let source_subscription = match source_subscription_id { + Some(subscription_id) => { + find_active_subscription_by_id( + &active_ws_subscriptions, + subscription_id, + ) + } + None => None, + }; match method.as_str() { "slotNotification" => { let parse_result = @@ -350,6 +399,8 @@ pub async fn run_listener_runtime( tracing::trace!( listener_session_id = session.id, subscription_id = notification.params.subscription, + source_kind = ?source_subscription.map(|value| value.kind), + source_label = ?source_subscription.and_then(|value| value.label.as_deref()), slot = notification.params.result.slot, parent = notification.params.result.parent, root = notification.params.result.root, @@ -373,6 +424,8 @@ pub async fn run_listener_runtime( tracing::trace!( listener_session_id = session.id, subscription_id = notification.params.subscription, + source_kind = ?source_subscription.map(|value| value.kind), + source_label = ?source_subscription.and_then(|value| value.label.as_deref()), signature = %notification.params.result.value.signature, "parsed logs notification" ); @@ -394,6 +447,8 @@ pub async fn run_listener_runtime( tracing::trace!( listener_session_id = session.id, subscription_id = notification.params.subscription, + source_kind = ?source_subscription.map(|value| value.kind), + source_label = ?source_subscription.and_then(|value| value.label.as_deref()), program_pubkey = %notification.params.result.value.pubkey, "parsed program notification" ); @@ -503,18 +558,16 @@ pub async fn run_listener_runtime( } tracing::info!( listener_session_id = session.id, - subscription_count = ws_subscription_handles.len(), + subscription_count = active_ws_subscriptions.len(), "starting websocket unsubscribe phase" ); - for subscription_handle in &ws_subscription_handles { + for subscription in &active_ws_subscriptions { let unsubscribe_timeout_result = tokio::time::timeout( std::time::Duration::from_millis(500), ws_client.unsubscribe( - subscription_handle.kind, - subscription_handle.subscription_id, - 1000u64 - .saturating_add(tick_count) - .saturating_add(subscription_handle.request_id), + subscription.kind, + subscription.subscription_id, + 1000u64.saturating_add(tick_count).saturating_add(subscription.request_id), ), ) .await; @@ -524,8 +577,9 @@ pub async fn run_listener_runtime( tracing::info!( listener_session_id = session.id, unsubscribed = value, - subscription_id = subscription_handle.subscription_id, - kind = ?subscription_handle.kind, + subscription_id = subscription.subscription_id, + kind = ?subscription.kind, + label = ?subscription.label, "websocket subscription cancelled" ); }, @@ -533,8 +587,9 @@ pub async fn run_listener_runtime( tracing::error!( listener_session_id = session.id, error = %error, - subscription_id = subscription_handle.subscription_id, - kind = ?subscription_handle.kind, + subscription_id = subscription.subscription_id, + kind = ?subscription.kind, + label = ?subscription.label, "failed to cancel websocket subscription" ); }, @@ -542,8 +597,9 @@ pub async fn run_listener_runtime( Err(_) => { tracing::error!( listener_session_id = session.id, - subscription_id = subscription_handle.subscription_id, - kind = ?subscription_handle.kind, + subscription_id = subscription.subscription_id, + kind = ?subscription.kind, + label = ?subscription.label, "websocket unsubscribe timed out" ); }, diff --git a/khbb_lib/src/solana_rpc_ws.rs b/khbb_lib/src/solana_rpc_ws.rs index 2535e4e..04d08c0 100644 --- a/khbb_lib/src/solana_rpc_ws.rs +++ b/khbb_lib/src/solana_rpc_ws.rs @@ -17,7 +17,7 @@ pub struct KhbbSolanaWsRpcClientConfig { } /// Supported WebSocket subscription kinds for the initial implementation. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub enum KhbbWsSubscriptionKind { /// `slotSubscribe` Slot,