This commit is contained in:
2026-04-29 20:07:18 +02:00
parent 0b36caf77d
commit 0f228b2ae5
8 changed files with 1650 additions and 20 deletions

View File

@@ -53,6 +53,8 @@ pub struct WsManager {
>,
transaction_resolution_relay_abort_handle:
tokio::sync::Mutex<std::option::Option<tokio::task::AbortHandle>>,
hybrid_observation_relay_abort_handle:
tokio::sync::Mutex<std::option::Option<tokio::task::AbortHandle>>,
}
impl WsManager {
@@ -104,6 +106,7 @@ impl WsManager {
detection_relay_abort_handle: tokio::sync::Mutex::new(None),
transaction_resolution_relay_sender: tokio::sync::Mutex::new(None),
transaction_resolution_relay_abort_handle: tokio::sync::Mutex::new(None),
hybrid_observation_relay_abort_handle: tokio::sync::Mutex::new(None),
})
}
@@ -469,11 +472,7 @@ impl WsManager {
));
}
}
let resolver = crate::KbTransactionResolutionService::new(
http_pool,
database,
http_role,
);
let resolver = crate::KbTransactionResolutionService::new(http_pool, database, http_role);
let relay = crate::KbWsTransactionResolutionRelay::new(resolver);
let (sender, receiver) = crate::KbWsTransactionResolutionRelay::channel(queue_capacity);
let relay_task = relay.spawn(receiver);
@@ -530,6 +529,50 @@ impl WsManager {
}
Ok(())
}
/// Collects the current hybrid WebSocket watch snapshot.
pub async fn collect_hybrid_watch_snapshot(
&self,
database: std::sync::Arc<crate::KbDatabase>,
) -> Result<crate::KbWsHybridWatchSnapshot, crate::KbError> {
let runtime = crate::KbWsHybridRuntimeService::new(database);
runtime.collect_watch_snapshot().await
}
/// Attaches one shared hybrid observation relay to the manager event stream.
pub async fn attach_hybrid_observation_relay(
&self,
database: std::sync::Arc<crate::KbDatabase>,
) -> Result<(), crate::KbError> {
{
let abort_guard = self.hybrid_observation_relay_abort_handle.lock().await;
if abort_guard.is_some() {
return Err(crate::KbError::InvalidState(
"websocket hybrid observation relay is already attached".to_string(),
));
}
}
let runtime = crate::KbWsHybridRuntimeService::new(database);
let receiver = self.subscribe_events();
let abort_handle = spawn_hybrid_observation_relay_task(receiver, runtime);
{
let mut abort_guard = self.hybrid_observation_relay_abort_handle.lock().await;
*abort_guard = Some(abort_handle);
}
Ok(())
}
/// Detaches the shared hybrid observation relay from the manager event stream.
pub async fn detach_hybrid_observation_relay(&self) -> Result<(), crate::KbError> {
let abort_handle_option = {
let mut abort_guard = self.hybrid_observation_relay_abort_handle.lock().await;
abort_guard.take()
};
if let Some(abort_handle) = abort_handle_option {
abort_handle.abort();
}
Ok(())
}
}
impl Drop for WsManager {
@@ -552,6 +595,12 @@ impl Drop for WsManager {
abort_handle.abort();
}
}
let hybrid_abort_result = self.hybrid_observation_relay_abort_handle.try_lock();
if let Ok(mut abort_guard) = hybrid_abort_result {
if let Some(abort_handle) = abort_guard.take() {
abort_handle.abort();
}
}
}
}
@@ -574,10 +623,179 @@ fn spawn_event_forward_task(
}
}
});
task.abort_handle()
}
fn spawn_hybrid_observation_relay_task(
mut receiver: tokio::sync::broadcast::Receiver<crate::WsEvent>,
runtime: crate::KbWsHybridRuntimeService,
) -> tokio::task::AbortHandle {
let task = tokio::spawn(async move {
loop {
let recv_result = receiver.recv().await;
match recv_result {
Ok(event) => {
handle_hybrid_observation_manager_event(&runtime, event).await;
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
});
task.abort_handle()
}
async fn handle_hybrid_observation_manager_event(
runtime: &crate::KbWsHybridRuntimeService,
event: crate::WsEvent,
) {
match event {
crate::WsEvent::SubscriptionNotification {
endpoint_name,
subscription,
notification,
..
} => {
handle_hybrid_subscription_notification(
runtime,
endpoint_name,
subscription,
notification,
)
.await;
}
crate::WsEvent::JsonRpcNotificationWithoutSubscription {
endpoint_name,
notification,
} => {
if notification.method == "logsNotification" {
let value_result = serde_json::to_value(notification.clone());
let value = match value_result {
Ok(value) => value,
Err(error) => {
tracing::warn!(
target: "kb_lib::ws_manager",
"cannot serialize logs notification for hybrid relay: {}",
error
);
return;
}
};
let record_result = runtime
.record_logs_notification(Some(endpoint_name), &value)
.await;
if let Err(error) = record_result {
tracing::warn!(
target: "kb_lib::ws_manager",
"hybrid logs observation failed: {}",
error
);
}
}
}
_ => {}
}
}
async fn handle_hybrid_subscription_notification(
runtime: &crate::KbWsHybridRuntimeService,
endpoint_name: std::string::String,
subscription: crate::WsSubscriptionInfo,
notification: crate::KbJsonRpcWsNotification,
) {
let value_result = serde_json::to_value(notification.clone());
let value = match value_result {
Ok(value) => value,
Err(error) => {
tracing::warn!(
target: "kb_lib::ws_manager",
"cannot serialize subscription notification for hybrid relay: {}",
error
);
return;
}
};
let method = notification.method.as_str();
if method == "logsNotification" {
let record_result = runtime
.record_logs_notification(Some(endpoint_name), &value)
.await;
if let Err(error) = record_result {
tracing::warn!(
target: "kb_lib::ws_manager",
"hybrid logs observation failed: {}",
error
);
}
return;
}
if method == "programNotification" {
let watched_program_id = kb_first_subscription_param_as_string(&subscription);
let watched_program_id = match watched_program_id {
Some(watched_program_id) => watched_program_id,
None => {
tracing::warn!(
target: "kb_lib::ws_manager",
"missing watched program id in subscription params"
);
return;
}
};
let record_result = runtime
.record_program_notification(Some(endpoint_name), watched_program_id, &value)
.await;
if let Err(error) = record_result {
tracing::warn!(
target: "kb_lib::ws_manager",
"hybrid program observation failed: {}",
error
);
}
return;
}
if method == "accountNotification" {
let watched_account = kb_first_subscription_param_as_string(&subscription);
let watched_account = match watched_account {
Some(watched_account) => watched_account,
None => {
tracing::warn!(
target: "kb_lib::ws_manager",
"missing watched account in subscription params"
);
return;
}
};
let record_result = runtime
.record_account_notification(Some(endpoint_name), watched_account, &value)
.await;
if let Err(error) = record_result {
tracing::warn!(
target: "kb_lib::ws_manager",
"hybrid account observation failed: {}",
error
);
}
}
}
fn kb_first_subscription_param_as_string(
subscription: &crate::WsSubscriptionInfo,
) -> std::option::Option<std::string::String> {
let first_param_option = subscription.params.first();
let first_param = match first_param_option {
Some(first_param) => first_param,
None => return None,
};
let text_option = first_param.as_str();
match text_option {
Some(text) => Some(text.to_string()),
None => None,
}
}
#[cfg(test)]
mod tests {
#[derive(Debug)]
@@ -811,6 +1029,213 @@ mod tests {
}
}
async fn spawn_hybrid_json_rpc_server() -> Self {
let listener_result = tokio::net::TcpListener::bind("127.0.0.1:0").await;
let listener = match listener_result {
Ok(listener) => listener,
Err(error) => panic!("tcp bind failed: {error}"),
};
let local_addr_result = listener.local_addr();
let local_addr = match local_addr_result {
Ok(local_addr) => local_addr,
Err(error) => panic!("local_addr failed: {error}"),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
tokio::spawn(async move {
loop {
tokio::select! {
_ = &mut shutdown_rx => {
break;
}
accept_result = listener.accept() => {
let (stream, _) = match accept_result {
Ok(pair) => pair,
Err(_) => break,
};
tokio::spawn(async move {
let accept_result = tokio_tungstenite::accept_async(stream).await;
let mut websocket = match accept_result {
Ok(websocket) => websocket,
Err(_) => return,
};
loop {
let next_result = futures_util::StreamExt::next(&mut websocket).await;
let message_result = match next_result {
Some(message_result) => message_result,
None => break,
};
let message = match message_result {
Ok(message) => message,
Err(_) => break,
};
match message {
tokio_tungstenite::tungstenite::Message::Text(text) => {
let parse_result: Result<serde_json::Value, _> =
serde_json::from_str(text.as_str());
let request_value = match parse_result {
Ok(request_value) => request_value,
Err(_) => continue,
};
let method_option = request_value
.get("method")
.and_then(serde_json::Value::as_str);
let method = match method_option {
Some(method) => method,
None => continue,
};
let id_value = match request_value.get("id") {
Some(id_value) => id_value.clone(),
None => serde_json::Value::from(1_u64),
};
if method == "logsSubscribe" {
let success_text = serde_json::json!({
"jsonrpc": "2.0",
"result": 701_u64,
"id": id_value,
}).to_string();
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Text(success_text.into()),
).await;
if send_result.is_err() {
break;
}
let notification_text = serde_json::json!({
"jsonrpc": "2.0",
"method": "logsNotification",
"params": {
"result": {
"context": {
"slot": 1001_u64
},
"value": {
"signature": "HybridLogsSig111",
"err": null,
"logs": [
"Program log: Instruction: Swap"
]
}
},
"subscription": 701_u64
}
}).to_string();
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Text(notification_text.into()),
).await;
if send_result.is_err() {
break;
}
} else if method == "programSubscribe" {
let success_text = serde_json::json!({
"jsonrpc": "2.0",
"result": 702_u64,
"id": id_value,
}).to_string();
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Text(success_text.into()),
).await;
if send_result.is_err() {
break;
}
let notification_text = serde_json::json!({
"jsonrpc": "2.0",
"method": "programNotification",
"params": {
"result": {
"context": {
"slot": 1002_u64
},
"value": {
"pubkey": "HybridProgramOwned111",
"account": {
"lamports": 1,
"owner": "HybridProgram111"
}
}
},
"subscription": 702_u64
}
}).to_string();
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Text(notification_text.into()),
).await;
if send_result.is_err() {
break;
}
} else if method == "accountSubscribe" {
let success_text = serde_json::json!({
"jsonrpc": "2.0",
"result": 703_u64,
"id": id_value,
}).to_string();
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Text(success_text.into()),
).await;
if send_result.is_err() {
break;
}
let notification_text = serde_json::json!({
"jsonrpc": "2.0",
"method": "accountNotification",
"params": {
"result": {
"context": {
"slot": 1003_u64
},
"value": {
"lamports": 10,
"owner": "HybridProgram111"
}
},
"subscription": 703_u64
}
}).to_string();
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Text(notification_text.into()),
).await;
if send_result.is_err() {
break;
}
}
}
tokio_tungstenite::tungstenite::Message::Ping(data) => {
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Pong(data),
).await;
if send_result.is_err() {
break;
}
}
tokio_tungstenite::tungstenite::Message::Close(frame) => {
let send_result = futures_util::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Close(frame),
).await;
let _ = send_result;
break;
}
tokio_tungstenite::tungstenite::Message::Binary(_) => {}
tokio_tungstenite::tungstenite::Message::Pong(_) => {}
tokio_tungstenite::tungstenite::Message::Frame(_) => {}
}
}
});
}
}
}
});
Self {
url: format!("ws://{}", local_addr),
shutdown_tx: Some(shutdown_tx),
}
}
async fn shutdown(mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
@@ -1096,4 +1521,151 @@ mod tests {
assert_eq!(program_endpoints, vec!["ws_b".to_string()]);
assert!(unknown_endpoints.is_empty());
}
async fn seed_hybrid_watch_database(database: &crate::KbDatabase) {
let dex_a = crate::KbDexDto::new(
"raydium_amm_v4".to_string(),
"Raydium AmmV4".to_string(),
Some("HybridRaydiumProgram111".to_string()),
None,
true,
);
let dex_a_id_result = crate::upsert_dex(database, &dex_a).await;
let dex_a_id = match dex_a_id_result {
Ok(dex_a_id) => dex_a_id,
Err(error) => panic!("dex A upsert failed: {error}"),
};
let dex_b = crate::KbDexDto::new(
"meteora_dbc".to_string(),
"Meteora DBC".to_string(),
Some("HybridMeteoraProgram111".to_string()),
Some("HybridSharedRouter111".to_string()),
true,
);
let dex_b_id_result = crate::upsert_dex(database, &dex_b).await;
let dex_b_id = match dex_b_id_result {
Ok(dex_b_id) => dex_b_id,
Err(error) => panic!("dex B upsert failed: {error}"),
};
let pool_a = crate::KbPoolDto::new(
dex_a_id,
"HybridPool111".to_string(),
crate::KbPoolKind::Amm,
crate::KbPoolStatus::Active,
);
let pool_a_result = crate::upsert_pool(database, &pool_a).await;
if let Err(error) = pool_a_result {
panic!("pool A upsert failed: {error}");
}
let pool_b = crate::KbPoolDto::new(
dex_b_id,
"HybridPool222".to_string(),
crate::KbPoolKind::Amm,
crate::KbPoolStatus::Inactive,
);
let pool_b_result = crate::upsert_pool(database, &pool_b).await;
if let Err(error) = pool_b_result {
panic!("pool B upsert failed: {error}");
}
}
#[tokio::test]
async fn collect_hybrid_watch_snapshot_reads_dexes_and_active_pools() {
let endpoints = vec![make_ws_endpoint(
"ws_a",
"ws://127.0.0.1:1".to_string(),
true,
)];
let manager_result = crate::WsManager::from_ws_endpoints(&endpoints);
let manager = match manager_result {
Ok(manager) => manager,
Err(error) => panic!("from_ws_endpoints failed: {error}"),
};
let database = create_database().await;
seed_hybrid_watch_database(&database).await;
let database = std::sync::Arc::new(database);
let snapshot_result = manager.collect_hybrid_watch_snapshot(database).await;
let snapshot = match snapshot_result {
Ok(snapshot) => snapshot,
Err(error) => panic!("collect_hybrid_watch_snapshot failed: {error}"),
};
assert_eq!(snapshot.program_targets.len(), 3);
assert_eq!(snapshot.account_targets.len(), 1);
assert_eq!(
snapshot.account_targets[0].address,
"HybridPool111".to_string()
);
}
#[tokio::test]
async fn attach_hybrid_observation_relay_records_logs_program_and_account_notifications() {
let server = TestWsServer::spawn_hybrid_json_rpc_server().await;
let endpoints = vec![make_ws_endpoint("ws_a", server.url.clone(), true)];
let manager_result = crate::WsManager::from_ws_endpoints(&endpoints);
let manager = match manager_result {
Ok(manager) => manager,
Err(error) => panic!("from_ws_endpoints failed: {error}"),
};
let database = create_database().await;
let database = std::sync::Arc::new(database);
let attach_result = manager
.attach_hybrid_observation_relay(database.clone())
.await;
if let Err(error) = attach_result {
panic!("attach_hybrid_observation_relay failed: {error}");
}
let start_result = manager.start_all().await;
if let Err(error) = start_result {
panic!("start_all failed: {error}");
}
let client_option = manager.client("ws_a").await;
let client = match client_option {
Some(client) => client,
None => panic!("client must exist"),
};
let logs_subscribe_result = client
.logs_subscribe_raw(serde_json::json!("all"), None)
.await;
if let Err(error) = logs_subscribe_result {
panic!("logsSubscribe failed: {error}");
}
let program_subscribe_result = client
.program_subscribe_raw("HybridProgram111".to_string(), None)
.await;
if let Err(error) = program_subscribe_result {
panic!("programSubscribe failed: {error}");
}
let account_subscribe_result = client
.account_subscribe_raw("HybridPool111".to_string(), None)
.await;
if let Err(error) = account_subscribe_result {
panic!("accountSubscribe failed: {error}");
}
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
let observations_result =
crate::list_recent_onchain_observations(database.as_ref(), 20).await;
let observations = match observations_result {
Ok(observations) => observations,
Err(error) => panic!("list_recent_onchain_observations failed: {error}"),
};
let mut kinds = std::collections::BTreeSet::new();
for observation in &observations {
kinds.insert(observation.observation_kind.clone());
}
assert!(kinds.contains("ws.hybrid.logs_notification"));
assert!(kinds.contains("ws.hybrid.program_notification"));
assert!(kinds.contains("ws.hybrid.account_notification"));
let stop_result = manager.stop_all().await;
if let Err(error) = stop_result {
panic!("stop_all failed: {error}");
}
let detach_result = manager.detach_hybrid_observation_relay().await;
if let Err(error) = detach_result {
panic!("detach_hybrid_observation_relay failed: {error}");
}
server.shutdown().await;
}
}