This commit is contained in:
2026-05-05 05:03:11 +02:00
parent 3e994995d7
commit f2c227e08f
132 changed files with 5767 additions and 4461 deletions

View File

@@ -93,13 +93,10 @@ impl WsManager {
spawn_event_forward_task(client.clone(), event_tx.clone());
clients.insert(
endpoint.name.clone(),
WsManagedClient {
client,
event_forward_abort_handle,
},
WsManagedClient { client, event_forward_abort_handle },
);
}
Ok(Self {
return Ok(Self {
clients: tokio::sync::Mutex::new(clients),
event_tx,
detection_relay_sender: tokio::sync::Mutex::new(None),
@@ -107,23 +104,23 @@ impl WsManager {
transaction_resolution_relay_sender: tokio::sync::Mutex::new(None),
transaction_resolution_relay_abort_handle: tokio::sync::Mutex::new(None),
hybrid_observation_relay_abort_handle: tokio::sync::Mutex::new(None),
})
});
}
/// Builds one manager from the application configuration.
pub fn from_config(config: &crate::KbConfig) -> Result<Self, crate::KbError> {
Self::from_ws_endpoints(&config.solana.ws_endpoints)
return Self::from_ws_endpoints(&config.solana.ws_endpoints);
}
/// Returns a unified broadcast receiver for all managed client events.
pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<crate::WsEvent> {
self.event_tx.subscribe()
return self.event_tx.subscribe();
}
/// Returns the list of managed endpoint names.
pub async fn endpoint_names(&self) -> std::vec::Vec<std::string::String> {
let clients_guard = self.clients.lock().await;
clients_guard.keys().cloned().collect()
return clients_guard.keys().cloned().collect();
}
/// Returns the list of managed endpoint names having the requested role.
@@ -136,12 +133,12 @@ impl WsManager {
.endpoint_config()
.roles
.iter()
.any(|configured_role| configured_role == role)
.any(|configured_role| return configured_role == role)
{
endpoint_names.push(endpoint_name.clone());
}
}
endpoint_names
return endpoint_names;
}
/// Starts all managed endpoints having the requested role.
@@ -158,7 +155,7 @@ impl WsManager {
started_count += 1;
}
}
Ok(started_count)
return Ok(started_count);
}
/// Stops all managed endpoints having the requested role.
@@ -175,7 +172,7 @@ impl WsManager {
stopped_count += 1;
}
}
Ok(stopped_count)
return Ok(stopped_count);
}
/// Returns one managed client by endpoint name.
@@ -183,8 +180,8 @@ impl WsManager {
let clients_guard = self.clients.lock().await;
let managed_option = clients_guard.get(endpoint_name);
match managed_option {
Some(managed) => Some(managed.client.clone()),
None => None,
Some(managed) => return Some(managed.client.clone()),
None => return None,
}
}
@@ -197,7 +194,7 @@ impl WsManager {
"unknown managed websocket endpoint '{}'",
endpoint_name
)));
}
},
};
let state = client.connection_state().await;
if state == crate::KbConnectionState::Connected
@@ -217,23 +214,21 @@ impl WsManager {
sender_guard.clone()
};
if let Some(sender) = tx_resolution_sender_option {
client
.set_transaction_resolution_notification_forwarder(sender)
.await;
client.set_transaction_resolution_notification_forwarder(sender).await;
}
let connect_result = client.connect().await;
if let Err(error) = connect_result {
return Err(error);
}
Ok(true)
return Ok(true);
}
/// Starts one managed endpoint.
pub async fn start_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> {
let start_result = self.start_endpoint_inner(endpoint_name).await;
match start_result {
Ok(_) => Ok(()),
Err(error) => Err(error),
Ok(_) => return Ok(()),
Err(error) => return Err(error),
}
}
@@ -246,7 +241,7 @@ impl WsManager {
"unknown managed websocket endpoint '{}'",
endpoint_name
)));
}
},
};
let state = client.connection_state().await;
if state == crate::KbConnectionState::Disconnected
@@ -255,22 +250,20 @@ impl WsManager {
return Ok(false);
}
client.clear_detection_notification_forwarder().await;
client
.clear_transaction_resolution_notification_forwarder()
.await;
client.clear_transaction_resolution_notification_forwarder().await;
let disconnect_result = client.disconnect().await;
if let Err(error) = disconnect_result {
return Err(error);
}
Ok(true)
return Ok(true);
}
/// Stops one managed endpoint.
pub async fn stop_endpoint(&self, endpoint_name: &str) -> Result<(), crate::KbError> {
let stop_result = self.stop_endpoint_inner(endpoint_name).await;
match stop_result {
Ok(_) => Ok(()),
Err(error) => Err(error),
Ok(_) => return Ok(()),
Err(error) => return Err(error),
}
}
@@ -288,7 +281,7 @@ impl WsManager {
started_count += 1;
}
}
Ok(started_count)
return Ok(started_count);
}
/// Stops all managed endpoints.
@@ -305,7 +298,7 @@ impl WsManager {
stopped_count += 1;
}
}
Ok(stopped_count)
return Ok(stopped_count);
}
/// Returns the number of active subscriptions for one endpoint.
@@ -321,9 +314,9 @@ impl WsManager {
"unknown managed websocket endpoint '{}'",
endpoint_name
)));
}
},
};
Ok(client.active_subscription_count().await)
return Ok(client.active_subscription_count().await);
}
/// Returns a consolidated snapshot of all managed endpoints.
@@ -353,11 +346,11 @@ impl WsManager {
active_subscription_count,
});
}
Ok(WsManagerSnapshot {
return Ok(WsManagerSnapshot {
endpoint_count: endpoints.len(),
started_count,
endpoints,
})
});
}
/// Attaches one shared detection relay to all managed clients.
@@ -398,11 +391,9 @@ impl WsManager {
values
};
for client in clients {
client
.set_detection_notification_forwarder(sender.clone())
.await;
client.set_detection_notification_forwarder(sender.clone()).await;
}
Ok(())
return Ok(());
}
/// Detaches the shared detection relay from all managed clients.
@@ -429,13 +420,13 @@ impl WsManager {
if let Some(abort_handle) = abort_handle_option {
abort_handle.abort();
}
Ok(())
return Ok(());
}
/// Returns whether one managed endpoint exists.
pub async fn has_endpoint(&self, endpoint_name: &str) -> bool {
let clients_guard = self.clients.lock().await;
clients_guard.contains_key(endpoint_name)
return clients_guard.contains_key(endpoint_name);
}
/// Returns the current connection state of one endpoint.
@@ -451,9 +442,9 @@ impl WsManager {
"unknown managed websocket endpoint '{}'",
endpoint_name
)));
}
},
};
Ok(client.connection_state().await)
return Ok(client.connection_state().await);
}
/// Attaches one shared transaction-resolution relay to all managed clients.
@@ -494,11 +485,9 @@ impl WsManager {
values
};
for client in clients {
client
.set_transaction_resolution_notification_forwarder(sender.clone())
.await;
client.set_transaction_resolution_notification_forwarder(sender.clone()).await;
}
Ok(())
return Ok(());
}
/// Detaches the shared transaction-resolution relay from all managed clients.
@@ -512,9 +501,7 @@ impl WsManager {
values
};
for client in clients {
client
.clear_transaction_resolution_notification_forwarder()
.await;
client.clear_transaction_resolution_notification_forwarder().await;
}
{
let mut sender_guard = self.transaction_resolution_relay_sender.lock().await;
@@ -527,7 +514,7 @@ impl WsManager {
if let Some(abort_handle) = abort_handle_option {
abort_handle.abort();
}
Ok(())
return Ok(());
}
/// Collects the current hybrid WebSocket watch snapshot.
@@ -536,7 +523,7 @@ impl WsManager {
database: std::sync::Arc<crate::KbDatabase>,
) -> Result<crate::KbWsHybridWatchSnapshot, crate::KbError> {
let runtime = crate::KbWsHybridRuntimeService::new(database);
runtime.collect_watch_snapshot().await
return runtime.collect_watch_snapshot().await;
}
/// Attaches one shared hybrid observation relay to the manager event stream.
@@ -559,7 +546,7 @@ impl WsManager {
let mut abort_guard = self.hybrid_observation_relay_abort_handle.lock().await;
*abort_guard = Some(abort_handle);
}
Ok(())
return Ok(());
}
/// Detaches the shared hybrid observation relay from the manager event stream.
@@ -571,7 +558,7 @@ impl WsManager {
if let Some(abort_handle) = abort_handle_option {
abort_handle.abort();
}
Ok(())
return Ok(());
}
}
@@ -615,15 +602,15 @@ fn spawn_event_forward_task(
match recv_result {
Ok(event) => {
let _ = event_tx.send(event);
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
},
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {},
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
},
}
}
});
task.abort_handle()
return task.abort_handle();
}
fn spawn_hybrid_observation_relay_task(
@@ -636,16 +623,15 @@ fn spawn_hybrid_observation_relay_task(
match recv_result {
Ok(event) => {
handle_hybrid_observation_manager_event(&runtime, event).await;
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
},
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {},
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
},
}
}
});
task.abort_handle()
return task.abort_handle();
}
async fn handle_hybrid_observation_manager_event(
@@ -666,11 +652,8 @@ async fn handle_hybrid_observation_manager_event(
notification,
)
.await;
}
crate::WsEvent::JsonRpcNotificationWithoutSubscription {
endpoint_name,
notification,
} => {
},
crate::WsEvent::JsonRpcNotificationWithoutSubscription { endpoint_name, notification } => {
if notification.method == "logsNotification" {
let value_result = serde_json::to_value(notification.clone());
let value = match value_result {
@@ -682,11 +665,10 @@ async fn handle_hybrid_observation_manager_event(
error
);
return;
}
},
};
let record_result = runtime
.record_logs_notification(Some(endpoint_name), &value)
.await;
let record_result =
runtime.record_logs_notification(Some(endpoint_name), &value).await;
if let Err(error) = record_result {
tracing::warn!(
target: "kb_lib::ws_manager",
@@ -695,8 +677,8 @@ async fn handle_hybrid_observation_manager_event(
);
}
}
}
_ => {}
},
_ => {},
}
}
@@ -716,13 +698,11 @@ async fn handle_hybrid_subscription_notification(
error
);
return;
}
},
};
let method = notification.method.as_str();
if method == "logsNotification" {
let record_result = runtime
.record_logs_notification(Some(endpoint_name), &value)
.await;
let record_result = runtime.record_logs_notification(Some(endpoint_name), &value).await;
if let Err(error) = record_result {
tracing::warn!(
target: "kb_lib::ws_manager",
@@ -742,7 +722,7 @@ async fn handle_hybrid_subscription_notification(
"missing watched program id in subscription params"
);
return;
}
},
};
let record_result = runtime
.record_program_notification(Some(endpoint_name), watched_program_id, &value)
@@ -766,7 +746,7 @@ async fn handle_hybrid_subscription_notification(
"missing watched account in subscription params"
);
return;
}
},
};
let record_result = runtime
.record_account_notification(Some(endpoint_name), watched_account, &value)
@@ -791,8 +771,8 @@ fn kb_first_subscription_param_as_string(
};
let text_option = first_param.as_str();
match text_option {
Some(text) => Some(text.to_string()),
None => None,
Some(text) => return Some(text.to_string()),
None => return None,
}
}
@@ -881,10 +861,10 @@ mod tests {
}
}
});
Self {
return Self {
url: format!("ws://{}", local_addr),
shutdown_tx: Some(shutdown_tx),
}
};
}
async fn spawn_json_rpc_server() -> Self {
@@ -1023,10 +1003,10 @@ mod tests {
}
}
});
Self {
return Self {
url: format!("ws://{}", local_addr),
shutdown_tx: Some(shutdown_tx),
}
};
}
async fn spawn_hybrid_json_rpc_server() -> Self {
@@ -1230,10 +1210,10 @@ mod tests {
}
}
});
Self {
return Self {
url: format!("ws://{}", local_addr),
shutdown_tx: Some(shutdown_tx),
}
};
}
async fn shutdown(mut self) {
@@ -1248,7 +1228,7 @@ mod tests {
url: std::string::String,
enabled: bool,
) -> crate::KbWsEndpointConfig {
crate::KbWsEndpointConfig {
return crate::KbWsEndpointConfig {
name: name.to_string(),
enabled,
provider: "test".to_string(),
@@ -1262,7 +1242,7 @@ mod tests {
write_channel_capacity: 32,
event_channel_capacity: 64,
auto_reconnect: false,
}
};
}
async fn recv_manager_event(
@@ -1275,7 +1255,7 @@ mod tests {
Err(_) => panic!("manager event receive timeout"),
};
match recv_result {
Ok(event) => event,
Ok(event) => return event,
Err(error) => panic!("manager event receive failed: {error}"),
}
}
@@ -1301,7 +1281,7 @@ mod tests {
};
let database_result = crate::KbDatabase::connect_and_initialize(&config).await;
match database_result {
Ok(database) => database,
Ok(database) => return database,
Err(error) => panic!("database init failed: {error}"),
}
}
@@ -1385,10 +1365,7 @@ mod tests {
assert_eq!(snapshot.started_count, 1);
assert_eq!(snapshot.endpoints.len(), 1);
assert_eq!(snapshot.endpoints[0].endpoint_name, "ws_a");
assert_eq!(
snapshot.endpoints[0].state,
crate::KbConnectionState::Connected
);
assert_eq!(snapshot.endpoints[0].state, crate::KbConnectionState::Connected);
let stop_result = manager.stop_all().await;
if let Err(error) = stop_result {
panic!("stop_all failed: {error}");
@@ -1571,11 +1548,7 @@ mod tests {
#[tokio::test]
async fn collect_hybrid_watch_snapshot_reads_dexes_and_active_pools() {
let endpoints = vec![make_ws_endpoint(
"ws_a",
"ws://127.0.0.1:1".to_string(),
true,
)];
let endpoints = vec![make_ws_endpoint("ws_a", "ws://127.0.0.1:1".to_string(), true)];
let manager_result = crate::WsManager::from_ws_endpoints(&endpoints);
let manager = match manager_result {
Ok(manager) => manager,
@@ -1594,10 +1567,7 @@ mod tests {
assert_eq!(snapshot.program_targets.len(), 3);
assert_eq!(snapshot.account_targets.len(), 1);
assert_eq!(
snapshot.account_targets[0].address,
"HybridPool111".to_string()
);
assert_eq!(snapshot.account_targets[0].address, "HybridPool111".to_string());
}
#[tokio::test]
@@ -1611,9 +1581,7 @@ mod tests {
};
let database = create_database().await;
let database = std::sync::Arc::new(database);
let attach_result = manager
.attach_hybrid_observation_relay(database.clone())
.await;
let attach_result = manager.attach_hybrid_observation_relay(database.clone()).await;
if let Err(error) = attach_result {
panic!("attach_hybrid_observation_relay failed: {error}");
}
@@ -1626,21 +1594,17 @@ mod tests {
Some(client) => client,
None => panic!("client must exist"),
};
let logs_subscribe_result = client
.logs_subscribe_raw(serde_json::json!("all"), None)
.await;
let logs_subscribe_result = client.logs_subscribe_raw(serde_json::json!("all"), None).await;
if let Err(error) = logs_subscribe_result {
panic!("logsSubscribe failed: {error}");
}
let program_subscribe_result = client
.program_subscribe_raw("HybridProgram111".to_string(), None)
.await;
let program_subscribe_result =
client.program_subscribe_raw("HybridProgram111".to_string(), None).await;
if let Err(error) = program_subscribe_result {
panic!("programSubscribe failed: {error}");
}
let account_subscribe_result = client
.account_subscribe_raw("HybridPool111".to_string(), None)
.await;
let account_subscribe_result =
client.account_subscribe_raw("HybridPool111".to_string(), None).await;
if let Err(error) = account_subscribe_result {
panic!("accountSubscribe failed: {error}");
}