This commit is contained in:
2026-04-22 08:21:49 +02:00
parent e754cb63bf
commit a9d26750fa
7 changed files with 235 additions and 59 deletions

View File

@@ -30,6 +30,11 @@ pub(crate) struct KbDemoWsStatusPayload {
current_subscribe_method: std::option::Option<std::string::String>,
current_unsubscribe_method: std::option::Option<std::string::String>,
current_notification_method: std::option::Option<std::string::String>,
event_count_total: u64,
notification_count_total: u64,
ui_log_count: u64,
suppressed_log_count: u64,
last_event_kind: std::option::Option<std::string::String>,
}
/// Subscribe request sent by the demo frontend.
@@ -53,6 +58,12 @@ pub(crate) struct KbDemoWsRuntimeState {
endpoint_url: std::option::Option<std::string::String>,
connection_state: kb_lib::KbConnectionState,
current_subscription: std::option::Option<kb_lib::WsSubscriptionInfo>,
event_count_total: u64,
notification_count_total: u64,
ui_log_count: u64,
suppressed_log_count: u64,
last_event_kind: std::option::Option<std::string::String>,
last_status_emit_at: std::option::Option<std::time::Instant>,
}
impl KbDemoWsRuntimeState {
@@ -66,6 +77,12 @@ impl KbDemoWsRuntimeState {
endpoint_url: None,
connection_state: kb_lib::KbConnectionState::Disconnected,
current_subscription: None,
event_count_total: 0,
notification_count_total: 0,
ui_log_count: 0,
suppressed_log_count: 0,
last_event_kind: None,
last_status_emit_at: None,
}
}
@@ -94,6 +111,11 @@ impl KbDemoWsRuntimeState {
current_subscribe_method,
current_unsubscribe_method,
current_notification_method,
event_count_total: self.event_count_total,
notification_count_total: self.notification_count_total,
ui_log_count: self.ui_log_count,
suppressed_log_count: self.suppressed_log_count,
last_event_kind: self.last_event_kind.clone(),
}
}
@@ -105,6 +127,12 @@ impl KbDemoWsRuntimeState {
self.endpoint_url = None;
self.connection_state = kb_lib::KbConnectionState::Disconnected;
self.current_subscription = None;
self.event_count_total = 0;
self.notification_count_total = 0;
self.ui_log_count = 0;
self.suppressed_log_count = 0;
self.last_event_kind = None;
self.last_status_emit_at = None;
}
}
@@ -154,10 +182,20 @@ pub(crate) async fn demo_ws_list_endpoints(
) -> Result<std::vec::Vec<KbDemoWsEndpointSummary>, std::string::String> {
let mut endpoints = std::vec::Vec::new();
for endpoint in &state.config.solana.ws_endpoints {
if !endpoint.enabled {
continue;
}
let resolved_url_result = endpoint.resolved_url();
let resolved_url = match resolved_url_result {
Ok(resolved_url) => resolved_url,
Err(_) => endpoint.url.clone(),
Err(error) => {
tracing::warn!(
endpoint_name = %endpoint.name,
"cannot resolve ws endpoint url from environment: {}",
error
);
format!("UNRESOLVED_ENV [{}] {}", endpoint.name, endpoint.url)
}
};
endpoints.push(KbDemoWsEndpointSummary {
name: endpoint.name.clone(),
@@ -231,9 +269,14 @@ pub(crate) async fn demo_ws_connect(
let recv_result = event_receiver.recv().await;
match recv_result {
Ok(event) => {
kb_apply_demo_ws_event_to_runtime(&relay_runtime, &event).await;
kb_emit_demo_ws_log(&relay_app_handle, &kb_format_demo_ws_event(&event));
kb_emit_demo_ws_status(&relay_app_handle, &relay_runtime).await;
let (emit_ui_log, emit_ui_status) =
kb_register_demo_ws_event_and_decide_emission(&relay_runtime, &event).await;
if emit_ui_log {
kb_emit_demo_ws_log(&relay_app_handle, &kb_format_demo_ws_event(&event));
}
if emit_ui_status {
kb_emit_demo_ws_status(&relay_app_handle, &relay_runtime).await;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
kb_emit_demo_ws_log(
@@ -306,10 +349,7 @@ pub(crate) async fn demo_ws_disconnect(
if let Some(client) = &client_option {
let disconnect_result = client.disconnect().await;
if let Err(error) = disconnect_result {
kb_emit_demo_ws_log(
&app_handle,
&format!("[demo] disconnect error: {}", error),
);
kb_emit_demo_ws_log(&app_handle, &format!("[demo] disconnect error: {}", error));
}
}
if let Some(relay_task) = relay_task_option {
@@ -581,11 +621,22 @@ where
}
}
async fn kb_apply_demo_ws_event_to_runtime(
async fn kb_register_demo_ws_event_and_decide_emission(
runtime_arc: &std::sync::Arc<tokio::sync::Mutex<KbDemoWsRuntimeState>>,
event: &kb_lib::WsEvent,
) {
) -> (bool, bool) {
let mut runtime_guard = runtime_arc.lock().await;
runtime_guard.event_count_total = runtime_guard.event_count_total.saturating_add(1);
runtime_guard.last_event_kind = Some(kb_demo_ws_event_kind_name(event).to_string());
let mut emit_ui_log = true;
let force_status_emit = matches!(
event,
kb_lib::WsEvent::Connected { .. }
| kb_lib::WsEvent::Disconnected { .. }
| kb_lib::WsEvent::SubscriptionRegistered { .. }
| kb_lib::WsEvent::SubscriptionUnregistered { .. }
| kb_lib::WsEvent::Error { .. }
);
match event {
kb_lib::WsEvent::Connected {
endpoint_name,
@@ -597,6 +648,35 @@ async fn kb_apply_demo_ws_event_to_runtime(
}
kb_lib::WsEvent::SubscriptionRegistered { subscription, .. } => {
runtime_guard.current_subscription = Some(subscription.clone());
runtime_guard.notification_count_total = 0;
}
kb_lib::WsEvent::SubscriptionNotification { subscription, .. } => {
runtime_guard.notification_count_total =
runtime_guard.notification_count_total.saturating_add(1);
let subscribe_method = subscription.subscribe_method.as_str();
let notif_count = runtime_guard.notification_count_total;
if subscribe_method == "logsSubscribe" || subscribe_method == "programSubscribe" {
emit_ui_log = notif_count % 100 == 1;
} else if subscribe_method == "slotsUpdatesSubscribe" {
emit_ui_log = notif_count % 20 == 1;
}
}
kb_lib::WsEvent::TextMessage { .. } | kb_lib::WsEvent::JsonRpcMessage { .. } => {
let subscribe_method_option = runtime_guard
.current_subscription
.as_ref()
.map(|subscription| subscription.subscribe_method.as_str());
if let Some(subscribe_method) = subscribe_method_option {
if subscribe_method == "logsSubscribe"
|| subscribe_method == "programSubscribe"
|| subscribe_method == "slotsUpdatesSubscribe"
{
emit_ui_log = false;
}
}
}
kb_lib::WsEvent::Pong { .. } => {
emit_ui_log = false;
}
kb_lib::WsEvent::SubscriptionUnregistered {
subscription_id, ..
@@ -605,9 +685,9 @@ async fn kb_apply_demo_ws_event_to_runtime(
.current_subscription
.as_ref()
.map(|subscription| subscription.subscription_id);
if current_subscription_id == Some(*subscription_id) {
runtime_guard.current_subscription = None;
runtime_guard.notification_count_total = 0;
}
}
kb_lib::WsEvent::Disconnected { .. } => {
@@ -616,9 +696,30 @@ async fn kb_apply_demo_ws_event_to_runtime(
runtime_guard.keepalive_task = None;
runtime_guard.connection_state = kb_lib::KbConnectionState::Disconnected;
runtime_guard.current_subscription = None;
runtime_guard.notification_count_total = 0;
}
_ => {}
}
if emit_ui_log {
runtime_guard.ui_log_count = runtime_guard.ui_log_count.saturating_add(1);
} else {
runtime_guard.suppressed_log_count = runtime_guard.suppressed_log_count.saturating_add(1);
}
let now = std::time::Instant::now();
let emit_ui_status = if force_status_emit {
true
} else {
match runtime_guard.last_status_emit_at {
Some(last_status_emit_at) => {
now.duration_since(last_status_emit_at) >= std::time::Duration::from_millis(250)
}
None => true,
}
};
if emit_ui_status {
runtime_guard.last_status_emit_at = Some(now);
}
(emit_ui_log, emit_ui_status)
}
async fn kb_emit_demo_ws_status(
@@ -846,3 +947,22 @@ async fn kb_demo_ws_keepalive_loop(app_handle: &tauri::AppHandle, client: &kb_li
}
}
}
fn kb_demo_ws_event_kind_name(event: &kb_lib::WsEvent) -> &'static str {
match event {
kb_lib::WsEvent::Connected { .. } => "connected",
kb_lib::WsEvent::TextMessage { .. } => "text_message",
kb_lib::WsEvent::JsonRpcMessage { .. } => "json_rpc_message",
kb_lib::WsEvent::JsonRpcParseError { .. } => "json_rpc_parse_error",
kb_lib::WsEvent::SubscriptionRegistered { .. } => "subscription_registered",
kb_lib::WsEvent::SubscriptionNotification { .. } => "subscription_notification",
kb_lib::WsEvent::JsonRpcNotificationWithoutSubscription { .. } => "untracked_notification",
kb_lib::WsEvent::SubscriptionUnregistered { .. } => "subscription_unregistered",
kb_lib::WsEvent::BinaryMessage { .. } => "binary_message",
kb_lib::WsEvent::Ping { .. } => "ping",
kb_lib::WsEvent::Pong { .. } => "pong",
kb_lib::WsEvent::CloseReceived { .. } => "close_received",
kb_lib::WsEvent::Disconnected { .. } => "disconnected",
kb_lib::WsEvent::Error { .. } => "error",
}
}