Files
khadhroony-bobobot/kb_lib/src/solana_pubsub_ws.rs
2026-05-10 00:33:01 +02:00

415 lines
16 KiB
Rust

// file: kb_lib/src/solana_pubsub_ws.rs
//! Typed Solana WebSocket PubSub helpers built on top of the generic JSON-RPC
//! transport.
//!
//! This module keeps the low-level transport and registry logic in
//! [`crate::WsClient`] unchanged while adding:
//! - typed `*_typed()` subscribe helpers using `solana_rpc_client_api::config`
//! - typed notification decoding using `solana_rpc_client_api::response`
/// Typed Solana PubSub notification payload.
#[derive(Clone, Debug)]
pub enum SolanaWsTypedNotification {
/// `accountNotification`
Account(solana_rpc_client_api::response::Response<solana_rpc_client_api::response::UiAccount>),
/// `blockNotification`
Block(solana_rpc_client_api::response::RpcBlockUpdate),
/// `logsNotification`
Logs(
solana_rpc_client_api::response::Response<solana_rpc_client_api::response::RpcLogsResponse>,
),
/// `programNotification`
Program(
solana_rpc_client_api::response::Response<solana_rpc_client_api::response::RpcKeyedAccount>,
),
/// `rootNotification`
Root(u64),
/// `signatureNotification`
Signature(
solana_rpc_client_api::response::Response<
solana_rpc_client_api::response::RpcSignatureResult,
>,
),
/// `slotNotification`
Slot(solana_rpc_client_api::response::SlotInfo),
/// `slotsUpdatesNotification`
SlotsUpdates(solana_rpc_client_api::response::SlotUpdate),
/// `voteNotification`
Vote(solana_rpc_client_api::response::RpcVote),
}
/// Parses a Solana JSON-RPC notification into an official typed payload.
pub fn parse_solana_ws_typed_notification(
notification: &crate::JsonRpcWsNotification,
) -> Result<SolanaWsTypedNotification, crate::Error> {
if notification.method == "accountNotification" {
let parse_result = serde_json::from_value::<
solana_rpc_client_api::response::Response<solana_rpc_client_api::response::UiAccount>,
>(notification.params.result.clone());
return match parse_result {
Ok(value) => Ok(SolanaWsTypedNotification::Account(value)),
Err(error) => Err(crate::Error::Json(format!(
"cannot parse accountNotification payload: {error}"
))),
};
}
if notification.method == "blockNotification" {
let parse_result = serde_json::from_value::<solana_rpc_client_api::response::RpcBlockUpdate>(
notification.params.result.clone(),
);
return match parse_result {
Ok(value) => Ok(SolanaWsTypedNotification::Block(value)),
Err(error) => {
Err(crate::Error::Json(format!("cannot parse blockNotification payload: {error}")))
},
};
}
if notification.method == "logsNotification" {
let parse_result = serde_json::from_value::<
solana_rpc_client_api::response::Response<
solana_rpc_client_api::response::RpcLogsResponse,
>,
>(notification.params.result.clone());
return match parse_result {
Ok(value) => Ok(SolanaWsTypedNotification::Logs(value)),
Err(error) => {
Err(crate::Error::Json(format!("cannot parse logsNotification payload: {error}")))
},
};
}
if notification.method == "programNotification" {
let parse_result = serde_json::from_value::<
solana_rpc_client_api::response::Response<
solana_rpc_client_api::response::RpcKeyedAccount,
>,
>(notification.params.result.clone());
return match parse_result {
Ok(value) => Ok(SolanaWsTypedNotification::Program(value)),
Err(error) => Err(crate::Error::Json(format!(
"cannot parse programNotification payload: {error}"
))),
};
}
if notification.method == "rootNotification" {
let root_option = notification.params.result.as_u64();
return match root_option {
Some(root) => Ok(SolanaWsTypedNotification::Root(root)),
None => Err(crate::Error::Json(
"cannot parse rootNotification payload: result is not a u64".to_string(),
)),
};
}
if notification.method == "signatureNotification" {
let parse_result = serde_json::from_value::<
solana_rpc_client_api::response::Response<
solana_rpc_client_api::response::RpcSignatureResult,
>,
>(notification.params.result.clone());
return match parse_result {
Ok(value) => Ok(SolanaWsTypedNotification::Signature(value)),
Err(error) => Err(crate::Error::Json(format!(
"cannot parse signatureNotification payload: {error}"
))),
};
}
if notification.method == "slotNotification" {
let parse_result = serde_json::from_value::<solana_rpc_client_api::response::SlotInfo>(
notification.params.result.clone(),
);
return match parse_result {
Ok(value) => Ok(SolanaWsTypedNotification::Slot(value)),
Err(error) => {
Err(crate::Error::Json(format!("cannot parse slotNotification payload: {error}")))
},
};
}
if notification.method == "slotsUpdatesNotification" {
let parse_result = serde_json::from_value::<solana_rpc_client_api::response::SlotUpdate>(
notification.params.result.clone(),
);
return match parse_result {
Ok(value) => Ok(SolanaWsTypedNotification::SlotsUpdates(value)),
Err(error) => Err(crate::Error::Json(format!(
"cannot parse slotsUpdatesNotification payload: {error}"
))),
};
}
if notification.method == "voteNotification" {
let parse_result = serde_json::from_value::<solana_rpc_client_api::response::RpcVote>(
notification.params.result.clone(),
);
return match parse_result {
Ok(value) => Ok(SolanaWsTypedNotification::Vote(value)),
Err(error) => {
Err(crate::Error::Json(format!("cannot parse voteNotification payload: {error}")))
},
};
}
return Err(crate::Error::Json(format!(
"unsupported Solana websocket notification method '{}'",
notification.method
)));
}
/// Parses a typed Solana PubSub notification from a generic websocket event.
///
/// This returns:
/// - `Ok(Some(...))` for JSON-RPC notification-bearing events
/// - `Ok(None)` for events that do not carry a notification
/// - `Err(...)` when a notification is present but cannot be decoded
pub fn parse_solana_ws_typed_notification_from_event(
event: &crate::WsEvent,
) -> Result<std::option::Option<SolanaWsTypedNotification>, crate::Error> {
match event {
crate::WsEvent::SubscriptionNotification { notification, .. } => {
let parse_result = parse_solana_ws_typed_notification(notification);
match parse_result {
Ok(value) => return Ok(Some(value)),
Err(error) => return Err(error),
}
},
crate::WsEvent::JsonRpcNotificationWithoutSubscription { notification, .. } => {
let parse_result = parse_solana_ws_typed_notification(notification);
match parse_result {
Ok(value) => return Ok(Some(value)),
Err(error) => return Err(error),
}
},
crate::WsEvent::JsonRpcMessage { message, .. } => match message {
crate::JsonRpcWsIncomingMessage::Notification(notification) => {
let parse_result = parse_solana_ws_typed_notification(notification);
match parse_result {
Ok(value) => return Ok(Some(value)),
Err(error) => return Err(error),
}
},
crate::JsonRpcWsIncomingMessage::SuccessResponse(_) => return Ok(None),
crate::JsonRpcWsIncomingMessage::ErrorResponse(_) => return Ok(None),
},
_ => return Ok(None),
}
}
impl crate::WsClient {
/// Typed helper for `accountSubscribe`.
pub async fn account_subscribe_typed(
&self,
pubkey: std::string::String,
config: std::option::Option<solana_rpc_client_api::config::RpcAccountInfoConfig>,
) -> Result<u64, crate::Error> {
let config_value_result =
serialize_optional_config_value(config, "accountSubscribe config");
let config_value = match config_value_result {
Ok(config_value) => config_value,
Err(error) => return Err(error),
};
return self.account_subscribe_raw(pubkey, config_value).await;
}
/// Typed helper for `blockSubscribe`.
pub async fn block_subscribe_typed(
&self,
filter: solana_rpc_client_api::config::RpcBlockSubscribeFilter,
config: std::option::Option<solana_rpc_client_api::config::RpcBlockSubscribeConfig>,
) -> Result<u64, crate::Error> {
let filter_value_result = serialize_required_value(filter, "blockSubscribe filter");
let filter_value = match filter_value_result {
Ok(filter_value) => filter_value,
Err(error) => return Err(error),
};
let config_value_result = serialize_optional_config_value(config, "blockSubscribe config");
let config_value = match config_value_result {
Ok(config_value) => config_value,
Err(error) => return Err(error),
};
return self.block_subscribe_raw(filter_value, config_value).await;
}
/// Typed helper for `logsSubscribe`.
pub async fn logs_subscribe_typed(
&self,
filter: solana_rpc_client_api::config::RpcTransactionLogsFilter,
config: std::option::Option<solana_rpc_client_api::config::RpcTransactionLogsConfig>,
) -> Result<u64, crate::Error> {
let filter_value_result = serialize_required_value(filter, "logsSubscribe filter");
let filter_value = match filter_value_result {
Ok(filter_value) => filter_value,
Err(error) => return Err(error),
};
let config_value_result = serialize_optional_config_value(config, "logsSubscribe config");
let config_value = match config_value_result {
Ok(config_value) => config_value,
Err(error) => return Err(error),
};
return self.logs_subscribe_raw(filter_value, config_value).await;
}
/// Typed helper for `programSubscribe`.
pub async fn program_subscribe_typed(
&self,
program_id: std::string::String,
config: std::option::Option<solana_rpc_client_api::config::RpcProgramAccountsConfig>,
) -> Result<u64, crate::Error> {
let config_value_result =
serialize_optional_config_value(config, "programSubscribe config");
let config_value = match config_value_result {
Ok(config_value) => config_value,
Err(error) => return Err(error),
};
return self.program_subscribe_raw(program_id, config_value).await;
}
/// Typed helper for `signatureSubscribe`.
pub async fn signature_subscribe_typed(
&self,
signature: std::string::String,
config: std::option::Option<solana_rpc_client_api::config::RpcSignatureSubscribeConfig>,
) -> Result<u64, crate::Error> {
let config_value_result =
serialize_optional_config_value(config, "signatureSubscribe config");
let config_value = match config_value_result {
Ok(config_value) => config_value,
Err(error) => return Err(error),
};
return self.signature_subscribe_raw(signature, config_value).await;
}
}
fn serialize_required_value<T>(value: T, label: &str) -> Result<serde_json::Value, crate::Error>
where
T: serde::Serialize,
{
let value_result = serde_json::to_value(value);
match value_result {
Ok(value) => return Ok(value),
Err(error) => {
return Err(crate::Error::Json(format!("cannot serialize {}: {error}", label)));
},
}
}
fn serialize_optional_config_value<T>(
value: std::option::Option<T>,
label: &str,
) -> Result<std::option::Option<serde_json::Value>, crate::Error>
where
T: serde::Serialize,
{
match value {
Some(value) => {
let value_result = serde_json::to_value(value);
match value_result {
Ok(value) => return Ok(Some(value)),
Err(error) => {
return Err(crate::Error::Json(format!("cannot serialize {}: {error}", label)));
},
}
},
None => return Ok(None),
}
}
#[cfg(test)]
mod tests {
fn make_test_ws_client() -> crate::WsClient {
let endpoint = crate::WsEndpointConfig {
name: "test_ws".to_string(),
enabled: true,
provider: "test".to_string(),
url: "ws://127.0.0.1:65535".to_string(),
api_key_env_var: None,
roles: vec!["test".to_string()],
max_subscriptions: 8,
connect_timeout_ms: 1000,
request_timeout_ms: 1000,
unsubscribe_timeout_ms: 500,
write_channel_capacity: 8,
event_channel_capacity: 32,
auto_reconnect: false,
};
return crate::WsClient::new(endpoint).expect("client creation must succeed");
}
#[test]
fn parse_root_notification_works() {
let notification = crate::JsonRpcWsNotification {
jsonrpc: "2.0".to_string(),
method: "rootNotification".to_string(),
params: crate::JsonRpcWsNotificationParams {
result: serde_json::Value::from(123u64),
subscription: 7,
},
};
let parsed = crate::parse_solana_ws_typed_notification(&notification)
.expect("typed root notification parse must succeed");
match parsed {
crate::SolanaWsTypedNotification::Root(root) => {
assert_eq!(root, 123);
},
other => {
panic!("unexpected parsed notification: {other:?}");
},
}
}
#[test]
fn parse_slot_notification_works() {
let notification = crate::JsonRpcWsNotification {
jsonrpc: "2.0".to_string(),
method: "slotNotification".to_string(),
params: crate::JsonRpcWsNotificationParams {
result: serde_json::json!({
"parent": 10,
"root": 11,
"slot": 12
}),
subscription: 8,
},
};
let parsed = crate::parse_solana_ws_typed_notification(&notification)
.expect("typed slot notification parse must succeed");
match parsed {
crate::SolanaWsTypedNotification::Slot(slot_info) => {
assert_eq!(slot_info.parent, 10);
assert_eq!(slot_info.root, 11);
assert_eq!(slot_info.slot, 12);
},
other => {
panic!("unexpected parsed notification: {other:?}");
},
}
}
#[tokio::test]
async fn account_subscribe_typed_uses_same_send_path() {
let client = make_test_ws_client();
let result =
client.account_subscribe_typed(crate::SYSTEM_PROGRAM_ID.to_string(), None).await;
match result {
Err(crate::Error::NotConnected(_)) => {},
other => {
panic!("unexpected result: {other:?}");
},
}
}
#[tokio::test]
async fn signature_subscribe_typed_uses_same_send_path() {
let client = make_test_ws_client();
let result = client
.signature_subscribe_typed(
"2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b".to_string(),
None,
)
.await;
match result {
Err(crate::Error::NotConnected(_)) => {},
other => {
panic!("unexpected result: {other:?}");
},
}
}
}