This commit is contained in:
2026-04-21 09:12:29 +02:00
parent 83a8a879aa
commit ff3b20f13b
4 changed files with 429 additions and 1 deletions

View File

@@ -16,6 +16,7 @@ mod rpc_ws;
mod tracing;
mod types;
mod ws_client;
mod rpc_ws_solana;
pub use crate::config::KbAppConfig;
pub use crate::config::KbConfig;
@@ -44,3 +45,7 @@ pub use crate::ws_client::WsClient;
pub use crate::ws_client::WsEvent;
pub use crate::ws_client::WsOutgoingMessage;
pub use crate::ws_client::WsSubscriptionInfo;
pub use crate::rpc_ws_solana::KbSolanaWsTypedNotification;
pub use crate::rpc_ws_solana::parse_kb_solana_ws_typed_notification;
pub use crate::rpc_ws_solana::parse_kb_solana_ws_typed_notification_from_event;

422
kb_lib/src/rpc_ws_solana.rs Normal file
View File

@@ -0,0 +1,422 @@
// file: kb_lib/src/rpc_ws_solana.rs
//! Typed Solana WebSocket PubSub helpers built on top of the generic JSON-RPC
//! transport.
//!
//! This module keeps the low-level transport and registry logic in
//! [`crate::WsClient`] unchanged while adding:
//! - typed `*_typed()` subscribe helpers using `solana_rpc_client_api::config`
//! - typed notification decoding using `solana_rpc_client_api::response`
/// Typed Solana PubSub notification payload.
#[derive(Clone, Debug)]
pub enum KbSolanaWsTypedNotification {
/// `accountNotification`
Account(solana_rpc_client_api::response::Response<solana_rpc_client_api::response::UiAccount>),
/// `blockNotification`
Block(solana_rpc_client_api::response::RpcBlockUpdate),
/// `logsNotification`
Logs(
solana_rpc_client_api::response::Response<solana_rpc_client_api::response::RpcLogsResponse>,
),
/// `programNotification`
Program(
solana_rpc_client_api::response::Response<solana_rpc_client_api::response::RpcKeyedAccount>,
),
/// `rootNotification`
Root(u64),
/// `signatureNotification`
Signature(
solana_rpc_client_api::response::Response<
solana_rpc_client_api::response::RpcSignatureResult,
>,
),
/// `slotNotification`
Slot(solana_rpc_client_api::response::SlotInfo),
/// `slotsUpdatesNotification`
SlotsUpdates(solana_rpc_client_api::response::SlotUpdate),
/// `voteNotification`
Vote(solana_rpc_client_api::response::RpcVote),
}
/// Parses a Solana JSON-RPC notification into an official typed payload.
pub fn parse_kb_solana_ws_typed_notification(
notification: &crate::KbJsonRpcWsNotification,
) -> Result<KbSolanaWsTypedNotification, crate::KbError> {
if notification.method == "accountNotification" {
let parse_result = serde_json::from_value::<
solana_rpc_client_api::response::Response<solana_rpc_client_api::response::UiAccount>,
>(notification.params.result.clone());
return match parse_result {
Ok(value) => Ok(KbSolanaWsTypedNotification::Account(value)),
Err(error) => Err(crate::KbError::Json(format!(
"cannot parse accountNotification payload: {error}"
))),
};
}
if notification.method == "blockNotification" {
let parse_result = serde_json::from_value::<solana_rpc_client_api::response::RpcBlockUpdate>(
notification.params.result.clone(),
);
return match parse_result {
Ok(value) => Ok(KbSolanaWsTypedNotification::Block(value)),
Err(error) => Err(crate::KbError::Json(format!(
"cannot parse blockNotification payload: {error}"
))),
};
}
if notification.method == "logsNotification" {
let parse_result = serde_json::from_value::<
solana_rpc_client_api::response::Response<
solana_rpc_client_api::response::RpcLogsResponse,
>,
>(notification.params.result.clone());
return match parse_result {
Ok(value) => Ok(KbSolanaWsTypedNotification::Logs(value)),
Err(error) => Err(crate::KbError::Json(format!(
"cannot parse logsNotification payload: {error}"
))),
};
}
if notification.method == "programNotification" {
let parse_result = serde_json::from_value::<
solana_rpc_client_api::response::Response<
solana_rpc_client_api::response::RpcKeyedAccount,
>,
>(notification.params.result.clone());
return match parse_result {
Ok(value) => Ok(KbSolanaWsTypedNotification::Program(value)),
Err(error) => Err(crate::KbError::Json(format!(
"cannot parse programNotification payload: {error}"
))),
};
}
if notification.method == "rootNotification" {
let root_option = notification.params.result.as_u64();
return match root_option {
Some(root) => Ok(KbSolanaWsTypedNotification::Root(root)),
None => Err(crate::KbError::Json(
"cannot parse rootNotification payload: result is not a u64".to_string(),
)),
};
}
if notification.method == "signatureNotification" {
let parse_result = serde_json::from_value::<
solana_rpc_client_api::response::Response<
solana_rpc_client_api::response::RpcSignatureResult,
>,
>(notification.params.result.clone());
return match parse_result {
Ok(value) => Ok(KbSolanaWsTypedNotification::Signature(value)),
Err(error) => Err(crate::KbError::Json(format!(
"cannot parse signatureNotification payload: {error}"
))),
};
}
if notification.method == "slotNotification" {
let parse_result = serde_json::from_value::<solana_rpc_client_api::response::SlotInfo>(
notification.params.result.clone(),
);
return match parse_result {
Ok(value) => Ok(KbSolanaWsTypedNotification::Slot(value)),
Err(error) => Err(crate::KbError::Json(format!(
"cannot parse slotNotification payload: {error}"
))),
};
}
if notification.method == "slotsUpdatesNotification" {
let parse_result = serde_json::from_value::<solana_rpc_client_api::response::SlotUpdate>(
notification.params.result.clone(),
);
return match parse_result {
Ok(value) => Ok(KbSolanaWsTypedNotification::SlotsUpdates(value)),
Err(error) => Err(crate::KbError::Json(format!(
"cannot parse slotsUpdatesNotification payload: {error}"
))),
};
}
if notification.method == "voteNotification" {
let parse_result = serde_json::from_value::<solana_rpc_client_api::response::RpcVote>(
notification.params.result.clone(),
);
return match parse_result {
Ok(value) => Ok(KbSolanaWsTypedNotification::Vote(value)),
Err(error) => Err(crate::KbError::Json(format!(
"cannot parse voteNotification payload: {error}"
))),
};
}
Err(crate::KbError::Json(format!(
"unsupported Solana websocket notification method '{}'",
notification.method
)))
}
/// Parses a typed Solana PubSub notification from a generic websocket event.
///
/// This returns:
/// - `Ok(Some(...))` for JSON-RPC notification-bearing events
/// - `Ok(None)` for events that do not carry a notification
/// - `Err(...)` when a notification is present but cannot be decoded
pub fn parse_kb_solana_ws_typed_notification_from_event(
event: &crate::WsEvent,
) -> Result<std::option::Option<KbSolanaWsTypedNotification>, crate::KbError> {
match event {
crate::WsEvent::SubscriptionNotification { notification, .. } => {
let parse_result = parse_kb_solana_ws_typed_notification(notification);
match parse_result {
Ok(value) => Ok(Some(value)),
Err(error) => Err(error),
}
}
crate::WsEvent::JsonRpcNotificationWithoutSubscription { notification, .. } => {
let parse_result = parse_kb_solana_ws_typed_notification(notification);
match parse_result {
Ok(value) => Ok(Some(value)),
Err(error) => Err(error),
}
}
crate::WsEvent::JsonRpcMessage { message, .. } => match message {
crate::KbJsonRpcWsIncomingMessage::Notification(notification) => {
let parse_result = parse_kb_solana_ws_typed_notification(notification);
match parse_result {
Ok(value) => Ok(Some(value)),
Err(error) => Err(error),
}
}
crate::KbJsonRpcWsIncomingMessage::SuccessResponse(_) => Ok(None),
crate::KbJsonRpcWsIncomingMessage::ErrorResponse(_) => Ok(None),
},
_ => Ok(None),
}
}
impl crate::WsClient {
/// Typed helper for `accountSubscribe`.
pub async fn account_subscribe_typed(
&self,
pubkey: std::string::String,
config: std::option::Option<solana_rpc_client_api::config::RpcAccountInfoConfig>,
) -> Result<u64, crate::KbError> {
let config_value_result =
kb_serialize_optional_config_value(config, "accountSubscribe config");
let config_value = match config_value_result {
Ok(config_value) => config_value,
Err(error) => return Err(error),
};
self.account_subscribe(pubkey, config_value).await
}
/// Typed helper for `blockSubscribe`.
pub async fn block_subscribe_typed(
&self,
filter: solana_rpc_client_api::config::RpcBlockSubscribeFilter,
config: std::option::Option<solana_rpc_client_api::config::RpcBlockSubscribeConfig>,
) -> Result<u64, crate::KbError> {
let filter_value_result = kb_serialize_required_value(filter, "blockSubscribe filter");
let filter_value = match filter_value_result {
Ok(filter_value) => filter_value,
Err(error) => return Err(error),
};
let config_value_result =
kb_serialize_optional_config_value(config, "blockSubscribe config");
let config_value = match config_value_result {
Ok(config_value) => config_value,
Err(error) => return Err(error),
};
self.block_subscribe(filter_value, config_value).await
}
/// Typed helper for `logsSubscribe`.
pub async fn logs_subscribe_typed(
&self,
filter: solana_rpc_client_api::config::RpcTransactionLogsFilter,
config: std::option::Option<solana_rpc_client_api::config::RpcTransactionLogsConfig>,
) -> Result<u64, crate::KbError> {
let filter_value_result = kb_serialize_required_value(filter, "logsSubscribe filter");
let filter_value = match filter_value_result {
Ok(filter_value) => filter_value,
Err(error) => return Err(error),
};
let config_value_result =
kb_serialize_optional_config_value(config, "logsSubscribe config");
let config_value = match config_value_result {
Ok(config_value) => config_value,
Err(error) => return Err(error),
};
self.logs_subscribe(filter_value, config_value).await
}
/// Typed helper for `programSubscribe`.
pub async fn program_subscribe_typed(
&self,
program_id: std::string::String,
config: std::option::Option<solana_rpc_client_api::config::RpcProgramAccountsConfig>,
) -> Result<u64, crate::KbError> {
let config_value_result =
kb_serialize_optional_config_value(config, "programSubscribe config");
let config_value = match config_value_result {
Ok(config_value) => config_value,
Err(error) => return Err(error),
};
self.program_subscribe(program_id, config_value).await
}
/// Typed helper for `signatureSubscribe`.
pub async fn signature_subscribe_typed(
&self,
signature: std::string::String,
config: std::option::Option<solana_rpc_client_api::config::RpcSignatureSubscribeConfig>,
) -> Result<u64, crate::KbError> {
let config_value_result =
kb_serialize_optional_config_value(config, "signatureSubscribe config");
let config_value = match config_value_result {
Ok(config_value) => config_value,
Err(error) => return Err(error),
};
self.signature_subscribe(signature, config_value).await
}
}
fn kb_serialize_required_value<T>(
value: T,
label: &str,
) -> Result<serde_json::Value, crate::KbError>
where
T: serde::Serialize,
{
let value_result = serde_json::to_value(value);
match value_result {
Ok(value) => Ok(value),
Err(error) => Err(crate::KbError::Json(format!(
"cannot serialize {}: {error}",
label
))),
}
}
fn kb_serialize_optional_config_value<T>(
value: std::option::Option<T>,
label: &str,
) -> Result<std::option::Option<serde_json::Value>, crate::KbError>
where
T: serde::Serialize,
{
match value {
Some(value) => {
let value_result = serde_json::to_value(value);
match value_result {
Ok(value) => Ok(Some(value)),
Err(error) => Err(crate::KbError::Json(format!(
"cannot serialize {}: {error}",
label
))),
}
}
None => Ok(None),
}
}
#[cfg(test)]
mod tests {
fn make_test_ws_client() -> crate::WsClient {
let endpoint = crate::KbWsEndpointConfig {
name: "test_ws".to_string(),
enabled: true,
provider: "test".to_string(),
url: "ws://127.0.0.1:65535".to_string(),
api_key_env_var: None,
roles: vec!["test".to_string()],
max_subscriptions: 8,
connect_timeout_ms: 1000,
request_timeout_ms: 1000,
unsubscribe_timeout_ms: 500,
write_channel_capacity: 8,
event_channel_capacity: 32,
auto_reconnect: false,
};
crate::WsClient::new(endpoint).expect("client creation must succeed")
}
#[test]
fn parse_root_notification_works() {
let notification = crate::KbJsonRpcWsNotification {
jsonrpc: "2.0".to_string(),
method: "rootNotification".to_string(),
params: crate::KbJsonRpcWsNotificationParams {
result: serde_json::Value::from(123u64),
subscription: 7,
},
};
let parsed = crate::parse_kb_solana_ws_typed_notification(&notification)
.expect("typed root notification parse must succeed");
match parsed {
crate::KbSolanaWsTypedNotification::Root(root) => {
assert_eq!(root, 123);
}
other => {
panic!("unexpected parsed notification: {other:?}");
}
}
}
#[test]
fn parse_slot_notification_works() {
let notification = crate::KbJsonRpcWsNotification {
jsonrpc: "2.0".to_string(),
method: "slotNotification".to_string(),
params: crate::KbJsonRpcWsNotificationParams {
result: serde_json::json!({
"parent": 10,
"root": 11,
"slot": 12
}),
subscription: 8,
},
};
let parsed = crate::parse_kb_solana_ws_typed_notification(&notification)
.expect("typed slot notification parse must succeed");
match parsed {
crate::KbSolanaWsTypedNotification::Slot(slot_info) => {
assert_eq!(slot_info.parent, 10);
assert_eq!(slot_info.root, 11);
assert_eq!(slot_info.slot, 12);
}
other => {
panic!("unexpected parsed notification: {other:?}");
}
}
}
#[tokio::test]
async fn account_subscribe_typed_uses_same_send_path() {
let client = make_test_ws_client();
let result = client
.account_subscribe_typed("11111111111111111111111111111111".to_string(), None)
.await;
match result {
Err(crate::KbError::NotConnected(_)) => {}
other => {
panic!("unexpected result: {other:?}");
}
}
}
#[tokio::test]
async fn signature_subscribe_typed_uses_same_send_path() {
let client = make_test_ws_client();
let result = client
.signature_subscribe_typed(
"2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b".to_string(),
None,
)
.await;
match result {
Err(crate::KbError::NotConnected(_)) => {}
other => {
panic!("unexpected result: {other:?}");
}
}
}
}