This commit is contained in:
2026-04-18 14:38:04 +02:00
parent 5ac8a7f1d5
commit c2ec9b8b41
4 changed files with 611 additions and 315 deletions

View File

@@ -8,7 +8,7 @@ members = [
] ]
[workspace.package] [workspace.package]
version = "0.4.3" version = "0.4.4"
edition = "2024" edition = "2024"
license = "MIT" license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot"

View File

@@ -63,3 +63,5 @@ pub use crate::solana_rpc_ws::KhbbWsJsonRpcResponseEnvelope;
pub use crate::solana_rpc_ws::KhbbWsNotificationEnvelope; pub use crate::solana_rpc_ws::KhbbWsNotificationEnvelope;
/// Notification params envelope used by Solana PubSub notifications. /// Notification params envelope used by Solana PubSub notifications.
pub use crate::solana_rpc_ws::KhbbWsNotificationParams; pub use crate::solana_rpc_ws::KhbbWsNotificationParams;
/// Classified incoming WebSocket message.
pub use crate::solana_rpc_ws::KhbbWsIncomingMessage;

View File

@@ -27,6 +27,7 @@ pub async fn run_listener_runtime(
let tick_duration = std::time::Duration::from_millis(config.listener_poll_interval_ms); let tick_duration = std::time::Duration::from_millis(config.listener_poll_interval_ms);
let mut interval = tokio::time::interval(tick_duration); let mut interval = tokio::time::interval(tick_duration);
let mut tick_count: u64 = 0; let mut tick_count: u64 = 0;
let mut final_status = std::string::String::from("stopped");
let http_client_config = let http_client_config =
crate::KhbbSolanaHttpRpcClientConfig { url: config.solana_http_rpc_url.clone() }; crate::KhbbSolanaHttpRpcClientConfig { url: config.solana_http_rpc_url.clone() };
let http_client_result = crate::KhbbSolanaHttpRpcClient::new(http_client_config); let http_client_result = crate::KhbbSolanaHttpRpcClient::new(http_client_config);
@@ -45,7 +46,6 @@ pub async fn run_listener_runtime(
return Err(error); return Err(error);
}, },
}; };
let mut ws_subscription_handles = std::vec::Vec::<crate::KhbbWsSubscriptionHandle>::new();
let ws_connect_result = ws_client.connect().await; let ws_connect_result = ws_client.connect().await;
match ws_connect_result { match ws_connect_result {
Ok(()) => { Ok(()) => {
@@ -59,6 +59,7 @@ pub async fn run_listener_runtime(
return Err(error); return Err(error);
}, },
} }
let mut ws_subscription_handles = std::vec::Vec::<crate::KhbbWsSubscriptionHandle>::new();
if config.enable_ws_slot_subscribe { if config.enable_ws_slot_subscribe {
let slot_subscribe_result = ws_client.slot_subscribe(1).await; let slot_subscribe_result = ws_client.slot_subscribe(1).await;
let slot_subscribe_output = match slot_subscribe_result { let slot_subscribe_output = match slot_subscribe_result {
@@ -234,7 +235,6 @@ pub async fn run_listener_runtime(
program_request_id = program_request_id.saturating_add(1); program_request_id = program_request_id.saturating_add(1);
} }
} }
let mut final_status = std::string::String::from("stopped");
loop { loop {
tokio::select! { tokio::select! {
_ = interval.tick() => { _ = interval.tick() => {
@@ -253,7 +253,7 @@ pub async fn run_listener_runtime(
"ok" "ok"
}; };
let insert_result = crate::storage::insert_raw_http_rpc_message( let insert_result = crate::storage::insert_raw_http_rpc_message(
&pool, pool,
session.id, session.id,
call_output.request_id as i64, call_output.request_id as i64,
&call_output.method, &call_output.method,
@@ -286,133 +286,161 @@ pub async fn run_listener_runtime(
} }
let ws_read_timeout_result = tokio::time::timeout( let ws_read_timeout_result = tokio::time::timeout(
std::time::Duration::from_millis(50), std::time::Duration::from_millis(50),
ws_client.read_next_text_message(), ws_client.read_next_incoming_message(),
) )
.await; .await;
match ws_read_timeout_result { match ws_read_timeout_result {
Ok(read_result) => { Ok(read_result) => {
match read_result { match read_result {
Ok(Some(message_text)) => { Ok(crate::KhbbWsIncomingMessage::Response { raw, id, .. }) => {
let insert_ws_message_result = crate::storage::insert_raw_ws_message( let insert_ws_message_result = crate::storage::insert_raw_ws_message(
pool, pool,
session.id, session.id,
"incoming", "incoming",
&message_text, &raw,
) )
.await; .await;
match insert_ws_message_result { match insert_ws_message_result {
Ok(()) => { Ok(()) => {
tracing::trace!( tracing::trace!(
listener_session_id = session.id, listener_session_id = session.id,
"raw websocket message stored" response_id = id,
"raw websocket response stored"
); );
} }
Err(error) => { Err(error) => {
tracing::error!( tracing::error!(
listener_session_id = session.id, listener_session_id = session.id,
error = %error, error = %error,
"failed to store incoming websocket message" "failed to store incoming websocket response"
);
}
}
let method_value_result =
serde_json::from_str::<serde_json::Value>(&message_text);
match method_value_result {
Ok(json_value) => {
let method_option = json_value
.get("method")
.and_then(serde_json::Value::as_str);
match method_option {
Some("slotNotification") => {
let parse_result =
crate::solana_rpc_ws::parse_slot_notification(&message_text);
match parse_result {
Ok(notification) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = notification.params.subscription,
slot = notification.params.result.slot,
parent = notification.params.result.parent,
root = notification.params.result.root,
"parsed slot notification"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to parse slot notification"
);
}
}
}
Some("logsNotification") => {
let parse_result =
crate::solana_rpc_ws::parse_logs_notification(&message_text);
match parse_result {
Ok(notification) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = notification.params.subscription,
signature = %notification.params.result.value.signature,
"parsed logs notification"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to parse logs notification"
);
}
}
}
Some("programNotification") => {
let parse_result =
crate::solana_rpc_ws::parse_program_notification(&message_text);
match parse_result {
Ok(notification) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = notification.params.subscription,
program_pubkey = %notification.params.result.value.pubkey,
"parsed program notification"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to parse program notification"
);
}
}
}
Some(other_method) => {
tracing::trace!(
listener_session_id = session.id,
method = %other_method,
"received unsupported websocket notification method"
);
}
None => {
tracing::trace!(
listener_session_id = session.id,
"received websocket json message without notification method"
);
}
}
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to decode websocket message as json value"
); );
} }
} }
} }
Ok(None) => { Ok(crate::KhbbWsIncomingMessage::Notification { raw, method, .. }) => {
let insert_ws_message_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
"incoming",
&raw,
)
.await;
match insert_ws_message_result {
Ok(()) => {
tracing::trace!(
listener_session_id = session.id,
method = %method,
"raw websocket notification stored"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to store incoming websocket notification"
);
}
}
match method.as_str() {
"slotNotification" => {
let parse_result =
crate::solana_rpc_ws::parse_slot_notification(&raw);
match parse_result {
Ok(notification) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = notification.params.subscription,
slot = notification.params.result.slot,
parent = notification.params.result.parent,
root = notification.params.result.root,
"parsed slot notification"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to parse slot notification"
);
}
}
}
"logsNotification" => {
let parse_result =
crate::solana_rpc_ws::parse_logs_notification(&raw);
match parse_result {
Ok(notification) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = notification.params.subscription,
signature = %notification.params.result.value.signature,
"parsed logs notification"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to parse logs notification"
);
}
}
}
"programNotification" => {
let parse_result =
crate::solana_rpc_ws::parse_program_notification(&raw);
match parse_result {
Ok(notification) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = notification.params.subscription,
program_pubkey = %notification.params.result.value.pubkey,
"parsed program notification"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to parse program notification"
);
}
}
}
_ => {
tracing::trace!(
listener_session_id = session.id,
method = %method,
"received unsupported websocket notification method"
);
}
}
}
Ok(crate::KhbbWsIncomingMessage::Unknown { raw, .. }) => {
let insert_ws_message_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
"incoming",
&raw,
)
.await;
match insert_ws_message_result {
Ok(()) => {
tracing::trace!(
listener_session_id = session.id,
"raw unknown websocket message stored"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to store incoming unknown websocket message"
);
}
}
}
Ok(crate::KhbbWsIncomingMessage::StreamEnded) => {
tracing::info!( tracing::info!(
listener_session_id = session.id, listener_session_id = session.id,
"websocket stream ended" "websocket stream ended"
@@ -426,6 +454,7 @@ pub async fn run_listener_runtime(
error = %error, error = %error,
"failed to read websocket message" "failed to read websocket message"
); );
final_status = std::string::String::from("ws_read_error"); final_status = std::string::String::from("ws_read_error");
break; break;
} }
@@ -462,6 +491,7 @@ pub async fn run_listener_runtime(
); );
} }
} }
return Err(crate::KhbbError::Runtime { return Err(crate::KhbbError::Runtime {
context: "wait for ctrl-c", context: "wait for ctrl-c",
message: error.to_string(), message: error.to_string(),
@@ -471,31 +501,50 @@ pub async fn run_listener_runtime(
} }
} }
} }
tracing::info!(
listener_session_id = session.id,
subscription_count = ws_subscription_handles.len(),
"starting websocket unsubscribe phase"
);
for subscription_handle in &ws_subscription_handles { for subscription_handle in &ws_subscription_handles {
let unsubscribe_result = ws_client let unsubscribe_timeout_result = tokio::time::timeout(
.unsubscribe( std::time::Duration::from_millis(500),
ws_client.unsubscribe(
subscription_handle.kind, subscription_handle.kind,
subscription_handle.subscription_id, subscription_handle.subscription_id,
tick_count.saturating_add(subscription_handle.request_id), 1000u64
) .saturating_add(tick_count)
.await; .saturating_add(subscription_handle.request_id),
match unsubscribe_result { ),
Ok(value) => { )
tracing::info!( .await;
listener_session_id = session.id, match unsubscribe_timeout_result {
unsubscribed = value, Ok(unsubscribe_result) => match unsubscribe_result {
subscription_id = subscription_handle.subscription_id, Ok(value) => {
kind = ?subscription_handle.kind, tracing::info!(
"websocket subscription cancelled" listener_session_id = session.id,
); unsubscribed = value,
subscription_id = subscription_handle.subscription_id,
kind = ?subscription_handle.kind,
"websocket subscription cancelled"
);
},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
subscription_id = subscription_handle.subscription_id,
kind = ?subscription_handle.kind,
"failed to cancel websocket subscription"
);
},
}, },
Err(error) => { Err(_) => {
tracing::error!( tracing::error!(
listener_session_id = session.id, listener_session_id = session.id,
error = %error,
subscription_id = subscription_handle.subscription_id, subscription_id = subscription_handle.subscription_id,
kind = ?subscription_handle.kind, kind = ?subscription_handle.kind,
"failed to cancel websocket subscription" "websocket unsubscribe timed out"
); );
}, },
} }
@@ -517,7 +566,11 @@ pub async fn run_listener_runtime(
crate::storage::update_listener_session_status(pool, session.id, &final_status).await; crate::storage::update_listener_session_status(pool, session.id, &final_status).await;
match status_update_result { match status_update_result {
Ok(()) => { Ok(()) => {
tracing::info!(listener_session_id = session.id, "listener runtime stopped"); tracing::info!(
listener_session_id = session.id,
final_status = %final_status,
"listener runtime stopped"
);
}, },
Err(error) => { Err(error) => {
return Err(error); return Err(error);

View File

@@ -147,6 +147,50 @@ pub struct KhbbWsSubscribeCallOutput {
pub response_body: std::string::String, pub response_body: std::string::String,
} }
/// Classified incoming WebSocket message.
#[derive(Debug, Clone)]
pub enum KhbbWsIncomingMessage {
/// A JSON-RPC response message identified by `id`.
Response {
/// Raw JSON message.
raw: std::string::String,
/// JSON-RPC response id.
id: u64,
/// Parsed JSON value.
json: serde_json::Value,
},
/// A PubSub notification message identified by `method`.
Notification {
/// Raw JSON message.
raw: std::string::String,
/// Notification method.
method: std::string::String,
/// Parsed JSON value.
json: serde_json::Value,
},
/// A JSON message that is neither a response nor a known notification.
Unknown {
/// Raw JSON message.
raw: std::string::String,
/// Parsed JSON value.
json: serde_json::Value,
},
/// The WebSocket stream ended.
StreamEnded,
}
impl KhbbWsIncomingMessage {
/// Returns the raw JSON message if this variant carries one.
pub fn raw_message_text(&self) -> std::option::Option<&str> {
match self {
Self::Response { raw, .. } => Some(raw.as_str()),
Self::Notification { raw, .. } => Some(raw.as_str()),
Self::Unknown { raw, .. } => Some(raw.as_str()),
Self::StreamEnded => None,
}
}
}
/// Minimal Solana WebSocket JSON-RPC client. /// Minimal Solana WebSocket JSON-RPC client.
#[derive(Debug)] #[derive(Debug)]
pub struct KhbbSolanaWsRpcClient { pub struct KhbbSolanaWsRpcClient {
@@ -158,6 +202,8 @@ pub struct KhbbSolanaWsRpcClient {
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>, tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>, >,
>, >,
/// Pending incoming messages captured while waiting for specific responses.
pub(crate) pending_incoming_messages: std::collections::VecDeque<KhbbWsIncomingMessage>,
} }
impl KhbbSolanaWsRpcClient { impl KhbbSolanaWsRpcClient {
@@ -172,7 +218,11 @@ impl KhbbSolanaWsRpcClient {
), ),
}); });
} }
Ok(Self { config, ws_stream: None }) Ok(Self {
config,
ws_stream: None,
pending_incoming_messages: std::collections::VecDeque::new(),
})
} }
/// Connects the WebSocket client to the configured endpoint. /// Connects the WebSocket client to the configured endpoint.
@@ -270,11 +320,181 @@ impl KhbbSolanaWsRpcClient {
} }
} }
/// Reads the next text message from the socket. /// Reads the next classified incoming message.
/// ///
/// This method skips ping and pong frames, answers ping with pong, and /// This method first drains the internal pending queue, then reads from the
/// returns `Ok(None)` on close or end-of-stream. /// socket if the queue is empty.
pub async fn read_next_text_message( pub async fn read_next_incoming_message(
&mut self,
) -> core::result::Result<KhbbWsIncomingMessage, crate::KhbbError> {
let pending_message_option = self.pending_incoming_messages.pop_front();
if let Some(message) = pending_message_option {
return Ok(message);
}
let raw_message_result = self.read_next_raw_text_message().await;
let raw_message_option = match raw_message_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let raw_message = match raw_message_option {
Some(value) => value,
None => {
return Ok(KhbbWsIncomingMessage::StreamEnded);
},
};
classify_incoming_message(&raw_message)
}
/// Performs a `slotSubscribe` call.
pub async fn slot_subscribe(
&mut self,
id: u64,
) -> core::result::Result<KhbbWsSubscribeCallOutput, crate::KhbbError> {
self.subscribe_with_raw_response(
KhbbWsSubscriptionKind::Slot,
std::vec::Vec::<serde_json::Value>::new(),
id,
)
.await
}
/// Performs a `logsSubscribe` call.
pub async fn logs_subscribe(
&mut self,
filter: solana_rpc_client_api::config::RpcTransactionLogsFilter,
config: std::option::Option<solana_rpc_client_api::config::RpcTransactionLogsConfig>,
id: u64,
) -> core::result::Result<KhbbWsSubscribeCallOutput, crate::KhbbError> {
let params_result = build_logs_subscribe_params(filter, config);
let params = match params_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
self.subscribe_with_raw_response(KhbbWsSubscriptionKind::Logs, params, id).await
}
/// Performs a `programSubscribe` call.
pub async fn program_subscribe(
&mut self,
program_id: &str,
config: std::option::Option<solana_rpc_client_api::config::RpcProgramAccountsConfig>,
id: u64,
) -> core::result::Result<KhbbWsSubscribeCallOutput, crate::KhbbError> {
let params_result = build_program_subscribe_params(program_id, config);
let params = match params_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
self.subscribe_with_raw_response(KhbbWsSubscriptionKind::Program, params, id)
.await
}
/// Performs a generic unsubscribe call.
pub async fn unsubscribe(
&mut self,
kind: KhbbWsSubscriptionKind,
subscription_id: u64,
id: u64,
) -> core::result::Result<bool, crate::KhbbError> {
let request_body_result = self
.send_json_rpc_request(
kind.unsubscribe_method_name(),
vec![serde_json::Value::Number(subscription_id.into())],
id,
)
.await;
match request_body_result {
Ok(_) => {}
Err(error) => {
return Err(error);
}
}
let response_body_result = self.wait_for_response_raw_by_id(id).await;
let response_body = match response_body_result {
Ok(value) => value,
Err(error) => {
return Err(error);
}
};
let parsed_response_result = parse_json_rpc_response::<bool>(&response_body);
let parsed_response = match parsed_response_result {
Ok(value) => value,
Err(error) => {
return Err(error);
}
};
if let Some(error_value) = parsed_response.error {
let error_text_result = serde_json::to_string(&error_value);
let error_text = match error_text_result {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Json {
context: "serialize websocket unsubscribe rpc error",
message: error.to_string(),
});
}
};
return Err(crate::KhbbError::Runtime {
context: "websocket unsubscribe returned rpc error",
message: error_text,
});
}
match parsed_response.result {
Some(value) => Ok(value),
None => Err(crate::KhbbError::Runtime {
context: "websocket unsubscribe returned empty result",
message: response_body,
}),
}
}
async fn subscribe_with_raw_response<TParams>(
&mut self,
kind: KhbbWsSubscriptionKind,
params: TParams,
id: u64,
) -> core::result::Result<KhbbWsSubscribeCallOutput, crate::KhbbError>
where
TParams: serde::Serialize + serde::de::DeserializeOwned,
{
let request_body_result =
self.send_json_rpc_request(kind.subscribe_method_name(), params, id).await;
let request_body = match request_body_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let response_body_result = self.wait_for_response_raw_by_id(id).await;
let response_body = match response_body_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let subscription_id_result = parse_subscription_id_response(&response_body);
let subscription_id = match subscription_id_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
Ok(KhbbWsSubscribeCallOutput {
request_id: id,
method: std::string::String::from(kind.subscribe_method_name()),
subscription_id,
request_body,
response_body,
})
}
async fn read_next_raw_text_message(
&mut self, &mut self,
) -> core::result::Result<std::option::Option<std::string::String>, crate::KhbbError> { ) -> core::result::Result<std::option::Option<std::string::String>, crate::KhbbError> {
loop { loop {
@@ -345,208 +565,196 @@ impl KhbbSolanaWsRpcClient {
} }
} }
/// Performs a `slotSubscribe` call. async fn wait_for_response_raw_by_id(
pub async fn slot_subscribe(
&mut self, &mut self,
id: u64, expected_id: u64,
) -> core::result::Result<KhbbWsSubscribeCallOutput, crate::KhbbError> { ) -> core::result::Result<std::string::String, crate::KhbbError> {
self.subscribe_with_raw_response( let mut deferred_messages = std::vec::Vec::<KhbbWsIncomingMessage>::new();
KhbbWsSubscriptionKind::Slot,
std::vec::Vec::<serde_json::Value>::new(),
id,
)
.await
}
/// Performs a `logsSubscribe` call.
pub async fn logs_subscribe(
&mut self,
filter: solana_rpc_client_api::config::RpcTransactionLogsFilter,
config: std::option::Option<solana_rpc_client_api::config::RpcTransactionLogsConfig>,
id: u64,
) -> core::result::Result<KhbbWsSubscribeCallOutput, crate::KhbbError> {
let params_result = build_logs_subscribe_params(filter, config);
let params = match params_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
self.subscribe_with_raw_response(KhbbWsSubscriptionKind::Logs, params, id).await
}
/// Performs a `programSubscribe` call.
pub async fn program_subscribe(
&mut self,
program_id: &str,
config: std::option::Option<solana_rpc_client_api::config::RpcProgramAccountsConfig>,
id: u64,
) -> core::result::Result<KhbbWsSubscribeCallOutput, crate::KhbbError> {
let params_result = build_program_subscribe_params(program_id, config);
let params = match params_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
self.subscribe_with_raw_response(KhbbWsSubscriptionKind::Program, params, id)
.await
}
/// Performs a generic unsubscribe call.
pub async fn unsubscribe(
&mut self,
kind: KhbbWsSubscriptionKind,
subscription_id: u64,
id: u64,
) -> core::result::Result<bool, crate::KhbbError> {
let request_body_result = self
.send_json_rpc_request(
kind.unsubscribe_method_name(),
vec![serde_json::Value::Number(subscription_id.into())],
id,
)
.await;
match request_body_result {
Ok(_) => {},
Err(error) => {
return Err(error);
},
}
loop { loop {
let response_body_result = self.read_next_text_message().await; let pending_message_option = self.pending_incoming_messages.pop_front();
let response_body_option = match response_body_result { if let Some(message) = pending_message_option {
Ok(value) => value, match message {
Err(error) => { KhbbWsIncomingMessage::Response { raw, id, .. } => {
return Err(error); if id == expected_id {
}, while let Some(deferred_message) = deferred_messages.pop() {
}; self.pending_incoming_messages.push_front(deferred_message);
let response_body = match response_body_option { }
Some(value) => value, return Ok(raw);
None => { }
return Err(crate::KhbbError::Runtime { let parse_json_result = serde_json::from_str::<serde_json::Value>(&raw);
context: "read websocket unsubscribe response", let json_value = match parse_json_result {
message: std::string::String::from( Ok(value) => value,
"websocket stream ended before unsubscribe response", Err(error) => {
), while let Some(deferred_message) = deferred_messages.pop() {
}); self.pending_incoming_messages.push_front(deferred_message);
}, }
}; return Err(crate::KhbbError::Json {
let json_value_result = serde_json::from_str::<serde_json::Value>(&response_body); context: "reparse deferred websocket response",
let json_value = match json_value_result { message: error.to_string(),
Ok(value) => value, });
Err(error) => { },
return Err(crate::KhbbError::Json { };
context: "decode websocket unsubscribe response as json value", deferred_messages.push(KhbbWsIncomingMessage::Response {
message: error.to_string(), raw,
}); id,
}, json: json_value,
};
let id_value_option = json_value.get("id");
if id_value_option.is_none() {
continue;
}
let parsed_response_result = parse_json_rpc_response::<bool>(&response_body);
let parsed_response = match parsed_response_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
if let Some(error_value) = parsed_response.error {
let error_text_result = serde_json::to_string(&error_value);
let error_text = match error_text_result {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Json {
context: "serialize websocket unsubscribe rpc error",
message: error.to_string(),
}); });
}, },
}; KhbbWsIncomingMessage::Notification { raw, method, json } => {
return Err(crate::KhbbError::Runtime { deferred_messages.push(KhbbWsIncomingMessage::Notification {
context: "websocket unsubscribe returned rpc error", raw,
message: error_text, method,
}); json,
});
},
KhbbWsIncomingMessage::Unknown { raw, json } => {
deferred_messages.push(KhbbWsIncomingMessage::Unknown { raw, json });
},
KhbbWsIncomingMessage::StreamEnded => {
while let Some(deferred_message) = deferred_messages.pop() {
self.pending_incoming_messages.push_front(deferred_message);
}
return Err(crate::KhbbError::Runtime {
context: "wait for websocket response by id",
message: std::string::String::from(
"websocket stream ended while waiting for response",
),
});
},
}
continue;
} }
match parsed_response.result { let raw_message_result = self.read_next_raw_text_message().await;
Some(value) => return Ok(value), let raw_message_option = match raw_message_result {
Ok(value) => value,
Err(error) => {
while let Some(deferred_message) = deferred_messages.pop() {
self.pending_incoming_messages.push_front(deferred_message);
}
return Err(error);
},
};
let raw_message = match raw_message_option {
Some(value) => value,
None => { None => {
while let Some(deferred_message) = deferred_messages.pop() {
self.pending_incoming_messages.push_front(deferred_message);
}
return Err(crate::KhbbError::Runtime { return Err(crate::KhbbError::Runtime {
context: "websocket unsubscribe returned empty result", context: "wait for websocket response by id",
message: response_body, message: std::string::String::from(
"websocket stream ended while waiting for response",
),
});
},
};
let classified_result = classify_incoming_message(&raw_message);
let classified_message = match classified_result {
Ok(value) => value,
Err(error) => {
while let Some(deferred_message) = deferred_messages.pop() {
self.pending_incoming_messages.push_front(deferred_message);
}
return Err(error);
},
};
match classified_message {
KhbbWsIncomingMessage::Response { raw, id, .. } => {
if id == expected_id {
while let Some(deferred_message) = deferred_messages.pop() {
self.pending_incoming_messages.push_front(deferred_message);
}
return Ok(raw);
}
let parse_json_result = serde_json::from_str::<serde_json::Value>(&raw);
let json_value = match parse_json_result {
Ok(value) => value,
Err(error) => {
while let Some(deferred_message) = deferred_messages.pop() {
self.pending_incoming_messages.push_front(deferred_message);
}
return Err(crate::KhbbError::Json {
context: "reparse deferred websocket response",
message: error.to_string(),
});
},
};
deferred_messages.push(KhbbWsIncomingMessage::Response {
raw,
id,
json: json_value,
});
},
KhbbWsIncomingMessage::Notification { raw, method, json } => {
deferred_messages.push(KhbbWsIncomingMessage::Notification {
raw,
method,
json,
});
},
KhbbWsIncomingMessage::Unknown { raw, json } => {
deferred_messages.push(KhbbWsIncomingMessage::Unknown { raw, json });
},
KhbbWsIncomingMessage::StreamEnded => {
while let Some(deferred_message) = deferred_messages.pop() {
self.pending_incoming_messages.push_front(deferred_message);
}
return Err(crate::KhbbError::Runtime {
context: "wait for websocket response by id",
message: std::string::String::from(
"websocket stream ended while waiting for response",
),
}); });
}, },
} }
} }
} }
}
async fn subscribe_with_raw_response<TParams>( /// Classifies an incoming raw WebSocket JSON message.
&mut self, pub(crate) fn classify_incoming_message(
kind: KhbbWsSubscriptionKind, raw_message: &str,
params: TParams, ) -> core::result::Result<KhbbWsIncomingMessage, crate::KhbbError> {
id: u64, let parse_result = serde_json::from_str::<serde_json::Value>(raw_message);
) -> core::result::Result<KhbbWsSubscribeCallOutput, crate::KhbbError> let json_value = match parse_result {
where Ok(value) => value,
TParams: serde::Serialize + serde::de::DeserializeOwned, Err(error) => {
{ return Err(crate::KhbbError::Json {
let request_body_result = context: "classify websocket incoming message",
self.send_json_rpc_request(kind.subscribe_method_name(), params, id).await; message: error.to_string(),
let request_body = match request_body_result { });
Ok(value) => value, },
Err(error) => { };
return Err(error); classify_incoming_message_from_json_value(raw_message, json_value)
}, }
};
loop { pub(crate) fn classify_incoming_message_from_json_value(
let response_body_result = self.read_next_text_message().await; raw_message: &str,
let response_body_option = match response_body_result { json_value: serde_json::Value,
Ok(value) => value, ) -> core::result::Result<KhbbWsIncomingMessage, crate::KhbbError> {
Err(error) => { let id_option = json_value.get("id");
return Err(error); if let Some(id_value) = id_option {
}, let id_as_u64_option = id_value.as_u64();
}; if let Some(id) = id_as_u64_option {
let response_body = match response_body_option { return Ok(KhbbWsIncomingMessage::Response {
Some(value) => value, raw: std::string::String::from(raw_message),
None => { id,
return Err(crate::KhbbError::Runtime { json: json_value,
context: "read websocket subscribe response",
message: std::string::String::from(
"websocket stream ended before subscribe response",
),
});
},
};
let json_value_result = serde_json::from_str::<serde_json::Value>(&response_body);
let json_value = match json_value_result {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Json {
context: "decode websocket subscribe response as json value",
message: error.to_string(),
});
},
};
let id_value_option = json_value.get("id");
if id_value_option.is_none() {
continue;
}
let subscription_id_result = parse_subscription_id_response(&response_body);
let subscription_id = match subscription_id_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
return Ok(KhbbWsSubscribeCallOutput {
request_id: id,
method: std::string::String::from(kind.subscribe_method_name()),
subscription_id,
request_body,
response_body,
}); });
} }
} }
let method_option = json_value.get("method").and_then(serde_json::Value::as_str);
if let Some(method) = method_option {
return Ok(KhbbWsIncomingMessage::Notification {
raw: std::string::String::from(raw_message),
method: std::string::String::from(method),
json: json_value,
});
}
Ok(KhbbWsIncomingMessage::Unknown {
raw: std::string::String::from(raw_message),
json: json_value,
})
} }
/// Parses a standard JSON-RPC response envelope. /// Parses a standard JSON-RPC response envelope.
@@ -577,6 +785,7 @@ pub(crate) fn parse_slot_notification(
let parse_result = serde_json::from_str::< let parse_result = serde_json::from_str::<
KhbbWsNotificationEnvelope<solana_rpc_client_api::response::SlotInfo>, KhbbWsNotificationEnvelope<solana_rpc_client_api::response::SlotInfo>,
>(response_body); >(response_body);
match parse_result { match parse_result {
Ok(value) => Ok(value), Ok(value) => Ok(value),
Err(error) => Err(crate::KhbbError::Json { Err(error) => Err(crate::KhbbError::Json {
@@ -761,6 +970,38 @@ mod tests {
); );
} }
#[test]
fn classify_incoming_message_accepts_response() {
let body = r#"{"jsonrpc":"2.0","id":7,"result":42}"#;
let result = super::classify_incoming_message(body);
assert!(result.is_ok());
let message = result.expect("classify response");
match message {
super::KhbbWsIncomingMessage::Response { id, .. } => {
assert_eq!(id, 7);
},
_ => {
panic!("expected response");
},
}
}
#[test]
fn classify_incoming_message_accepts_notification() {
let body = r#"{"jsonrpc":"2.0","method":"slotNotification","params":{"subscription":1,"result":{"slot":2,"parent":1,"root":1}}}"#;
let result = super::classify_incoming_message(body);
assert!(result.is_ok());
let message = result.expect("classify notification");
match message {
super::KhbbWsIncomingMessage::Notification { method, .. } => {
assert_eq!(method, "slotNotification");
},
_ => {
panic!("expected notification");
},
}
}
#[test] #[test]
fn parse_subscription_id_response_accepts_success_payload() { fn parse_subscription_id_response_accepts_success_payload() {
let body = r#"{ let body = r#"{