This commit is contained in:
2026-04-18 19:06:07 +02:00
parent c2ec9b8b41
commit 17587bdde5
4 changed files with 90 additions and 19 deletions

View File

@@ -8,7 +8,7 @@ members = [
] ]
[workspace.package] [workspace.package]
version = "0.4.4" version = "0.4.5"
edition = "2024" edition = "2024"
license = "MIT" license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot"

View File

@@ -35,3 +35,18 @@ pub(crate) struct KhbbTrackedToken {
/// UTC creation timestamp in RFC3339 format. /// UTC creation timestamp in RFC3339 format.
pub created_at: std::string::String, pub created_at: std::string::String,
} }
/// Active WebSocket subscription tracked by the listener runtime.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub(crate) struct KhbbActiveWsSubscription {
/// Client-side request identifier used to create the subscription.
pub request_id: u64,
/// Server-side subscription identifier returned by the RPC node.
pub subscription_id: u64,
/// Subscription kind.
pub kind: crate::KhbbWsSubscriptionKind,
/// Optional label associated with the subscription.
///
/// For program subscriptions, this stores the subscribed program id.
pub label: std::option::Option<std::string::String>,
}

View File

@@ -5,6 +5,19 @@
//! This module does not yet connect to Solana RPC, WebSocket or gRPC streams. //! This module does not yet connect to Solana RPC, WebSocket or gRPC streams.
//! It prepares the runtime structure, persistence hooks and shutdown behavior. //! It prepares the runtime structure, persistence hooks and shutdown behavior.
fn find_active_subscription_by_id<'a>(
subscriptions: &'a [crate::domain::KhbbActiveWsSubscription],
subscription_id: u64,
) -> std::option::Option<&'a crate::domain::KhbbActiveWsSubscription> {
for subscription in subscriptions {
if subscription.subscription_id == subscription_id {
return Some(subscription);
}
}
None
}
/// Runs the current listener runtime skeleton. /// Runs the current listener runtime skeleton.
pub async fn run_listener_runtime( pub async fn run_listener_runtime(
pool: &sqlx::SqlitePool, pool: &sqlx::SqlitePool,
@@ -59,7 +72,8 @@ pub async fn run_listener_runtime(
return Err(error); return Err(error);
}, },
} }
let mut ws_subscription_handles = std::vec::Vec::<crate::KhbbWsSubscriptionHandle>::new(); let mut active_ws_subscriptions =
std::vec::Vec::<crate::domain::KhbbActiveWsSubscription>::new();
if config.enable_ws_slot_subscribe { if config.enable_ws_slot_subscribe {
let slot_subscribe_result = ws_client.slot_subscribe(1).await; let slot_subscribe_result = ws_client.slot_subscribe(1).await;
let slot_subscribe_output = match slot_subscribe_result { let slot_subscribe_output = match slot_subscribe_result {
@@ -113,7 +127,12 @@ pub async fn run_listener_runtime(
subscription_id = slot_subscription_handle.subscription_id, subscription_id = slot_subscription_handle.subscription_id,
"slot websocket subscription established" "slot websocket subscription established"
); );
ws_subscription_handles.push(slot_subscription_handle); active_ws_subscriptions.push(crate::domain::KhbbActiveWsSubscription {
request_id: slot_subscription_handle.request_id,
subscription_id: slot_subscription_handle.subscription_id,
kind: slot_subscription_handle.kind,
label: None,
});
} }
if config.enable_ws_logs_subscribe { if config.enable_ws_logs_subscribe {
let logs_subscribe_result = ws_client let logs_subscribe_result = ws_client
@@ -170,7 +189,12 @@ pub async fn run_listener_runtime(
subscription_id = logs_subscription_handle.subscription_id, subscription_id = logs_subscription_handle.subscription_id,
"logs websocket subscription established" "logs websocket subscription established"
); );
ws_subscription_handles.push(logs_subscription_handle); active_ws_subscriptions.push(crate::domain::KhbbActiveWsSubscription {
request_id: logs_subscription_handle.request_id,
subscription_id: logs_subscription_handle.subscription_id,
kind: logs_subscription_handle.kind,
label: None,
});
} }
if config.enable_ws_program_subscribe { if config.enable_ws_program_subscribe {
let mut program_request_id: u64 = 10; let mut program_request_id: u64 = 10;
@@ -231,7 +255,12 @@ pub async fn run_listener_runtime(
program_id = %program_id, program_id = %program_id,
"program websocket subscription established" "program websocket subscription established"
); );
ws_subscription_handles.push(program_subscription_handle); active_ws_subscriptions.push(crate::domain::KhbbActiveWsSubscription {
request_id: program_subscription_handle.request_id,
subscription_id: program_subscription_handle.subscription_id,
kind: program_subscription_handle.kind,
label: Some(program_id.clone()),
});
program_request_id = program_request_id.saturating_add(1); program_request_id = program_request_id.saturating_add(1);
} }
} }
@@ -341,6 +370,26 @@ pub async fn run_listener_runtime(
); );
} }
} }
let raw_json_result =
serde_json::from_str::<serde_json::Value>(&raw);
let source_subscription_id = match raw_json_result {
Ok(json_value) => {
json_value
.get("params")
.and_then(|value| value.get("subscription"))
.and_then(serde_json::Value::as_u64)
}
Err(_) => None,
};
let source_subscription = match source_subscription_id {
Some(subscription_id) => {
find_active_subscription_by_id(
&active_ws_subscriptions,
subscription_id,
)
}
None => None,
};
match method.as_str() { match method.as_str() {
"slotNotification" => { "slotNotification" => {
let parse_result = let parse_result =
@@ -350,6 +399,8 @@ pub async fn run_listener_runtime(
tracing::trace!( tracing::trace!(
listener_session_id = session.id, listener_session_id = session.id,
subscription_id = notification.params.subscription, subscription_id = notification.params.subscription,
source_kind = ?source_subscription.map(|value| value.kind),
source_label = ?source_subscription.and_then(|value| value.label.as_deref()),
slot = notification.params.result.slot, slot = notification.params.result.slot,
parent = notification.params.result.parent, parent = notification.params.result.parent,
root = notification.params.result.root, root = notification.params.result.root,
@@ -373,6 +424,8 @@ pub async fn run_listener_runtime(
tracing::trace!( tracing::trace!(
listener_session_id = session.id, listener_session_id = session.id,
subscription_id = notification.params.subscription, subscription_id = notification.params.subscription,
source_kind = ?source_subscription.map(|value| value.kind),
source_label = ?source_subscription.and_then(|value| value.label.as_deref()),
signature = %notification.params.result.value.signature, signature = %notification.params.result.value.signature,
"parsed logs notification" "parsed logs notification"
); );
@@ -394,6 +447,8 @@ pub async fn run_listener_runtime(
tracing::trace!( tracing::trace!(
listener_session_id = session.id, listener_session_id = session.id,
subscription_id = notification.params.subscription, subscription_id = notification.params.subscription,
source_kind = ?source_subscription.map(|value| value.kind),
source_label = ?source_subscription.and_then(|value| value.label.as_deref()),
program_pubkey = %notification.params.result.value.pubkey, program_pubkey = %notification.params.result.value.pubkey,
"parsed program notification" "parsed program notification"
); );
@@ -503,18 +558,16 @@ pub async fn run_listener_runtime(
} }
tracing::info!( tracing::info!(
listener_session_id = session.id, listener_session_id = session.id,
subscription_count = ws_subscription_handles.len(), subscription_count = active_ws_subscriptions.len(),
"starting websocket unsubscribe phase" "starting websocket unsubscribe phase"
); );
for subscription_handle in &ws_subscription_handles { for subscription in &active_ws_subscriptions {
let unsubscribe_timeout_result = tokio::time::timeout( let unsubscribe_timeout_result = tokio::time::timeout(
std::time::Duration::from_millis(500), std::time::Duration::from_millis(500),
ws_client.unsubscribe( ws_client.unsubscribe(
subscription_handle.kind, subscription.kind,
subscription_handle.subscription_id, subscription.subscription_id,
1000u64 1000u64.saturating_add(tick_count).saturating_add(subscription.request_id),
.saturating_add(tick_count)
.saturating_add(subscription_handle.request_id),
), ),
) )
.await; .await;
@@ -524,8 +577,9 @@ pub async fn run_listener_runtime(
tracing::info!( tracing::info!(
listener_session_id = session.id, listener_session_id = session.id,
unsubscribed = value, unsubscribed = value,
subscription_id = subscription_handle.subscription_id, subscription_id = subscription.subscription_id,
kind = ?subscription_handle.kind, kind = ?subscription.kind,
label = ?subscription.label,
"websocket subscription cancelled" "websocket subscription cancelled"
); );
}, },
@@ -533,8 +587,9 @@ pub async fn run_listener_runtime(
tracing::error!( tracing::error!(
listener_session_id = session.id, listener_session_id = session.id,
error = %error, error = %error,
subscription_id = subscription_handle.subscription_id, subscription_id = subscription.subscription_id,
kind = ?subscription_handle.kind, kind = ?subscription.kind,
label = ?subscription.label,
"failed to cancel websocket subscription" "failed to cancel websocket subscription"
); );
}, },
@@ -542,8 +597,9 @@ pub async fn run_listener_runtime(
Err(_) => { Err(_) => {
tracing::error!( tracing::error!(
listener_session_id = session.id, listener_session_id = session.id,
subscription_id = subscription_handle.subscription_id, subscription_id = subscription.subscription_id,
kind = ?subscription_handle.kind, kind = ?subscription.kind,
label = ?subscription.label,
"websocket unsubscribe timed out" "websocket unsubscribe timed out"
); );
}, },

View File

@@ -17,7 +17,7 @@ pub struct KhbbSolanaWsRpcClientConfig {
} }
/// Supported WebSocket subscription kinds for the initial implementation. /// Supported WebSocket subscription kinds for the initial implementation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum KhbbWsSubscriptionKind { pub enum KhbbWsSubscriptionKind {
/// `slotSubscribe` /// `slotSubscribe`
Slot, Slot,