This commit is contained in:
2026-04-18 12:03:44 +02:00
parent b5cde7d512
commit 0e8c7ac403
5 changed files with 1050 additions and 12 deletions

View File

@@ -17,6 +17,7 @@ mod listener;
mod solana_rpc_http;
mod storage;
mod tracing_setup;
mod solana_rpc_ws;
/// Runs the listener application bootstrap workflow.
pub use crate::app::run_listener_app;
@@ -44,3 +45,21 @@ pub use crate::storage::create_sqlite_pool;
pub use crate::storage::ensure_sqlite_schema;
/// Initializes the tracing subscriber for the process.
pub use crate::tracing_setup::init_tracing;
/// Minimal Solana WebSocket JSON-RPC client.
pub use crate::solana_rpc_ws::KhbbSolanaWsRpcClient;
/// Configuration of the minimal Solana WebSocket RPC client.
pub use crate::solana_rpc_ws::KhbbSolanaWsRpcClientConfig;
/// Supported initial WebSocket subscription kinds.
pub use crate::solana_rpc_ws::KhbbWsSubscriptionKind;
/// Handle returned after a successful WebSocket subscription.
pub use crate::solana_rpc_ws::KhbbWsSubscriptionHandle;
/// Output of a successful WebSocket subscribe call.
pub use crate::solana_rpc_ws::KhbbWsSubscribeCallOutput;
/// Minimal JSON-RPC request envelope for WebSocket RPC calls.
pub use crate::solana_rpc_ws::KhbbWsJsonRpcRequestEnvelope;
/// Minimal JSON-RPC response envelope for WebSocket RPC calls.
pub use crate::solana_rpc_ws::KhbbWsJsonRpcResponseEnvelope;
/// Notification envelope used by Solana PubSub notifications.
pub use crate::solana_rpc_ws::KhbbWsNotificationEnvelope;
/// Notification params envelope used by Solana PubSub notifications.
pub use crate::solana_rpc_ws::KhbbWsNotificationParams;

View File

@@ -26,21 +26,100 @@ pub async fn run_listener_runtime(
let tick_duration = std::time::Duration::from_millis(config.listener_poll_interval_ms);
let mut interval = tokio::time::interval(tick_duration);
let mut tick_count: u64 = 0;
let http_client_config = crate::KhbbSolanaHttpRpcClientConfig {
url: config.solana_http_rpc_url.clone(),
};
let http_client_result =
crate::KhbbSolanaHttpRpcClient::new(http_client_config);
let http_client_config =
crate::KhbbSolanaHttpRpcClientConfig { url: config.solana_http_rpc_url.clone() };
let http_client_result = crate::KhbbSolanaHttpRpcClient::new(http_client_config);
let http_client = match http_client_result {
Ok(value) => value,
Err(error) => {
return Err(error);
}
},
};
let ws_client_config =
crate::KhbbSolanaWsRpcClientConfig { url: config.solana_ws_rpc_url.clone() };
let ws_client_result = crate::KhbbSolanaWsRpcClient::new(ws_client_config);
let mut ws_client = match ws_client_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let ws_connect_result = ws_client.connect().await;
match ws_connect_result {
Ok(()) => {
tracing::info!(
listener_session_id = session.id,
solana_ws_rpc_url = %config.solana_ws_rpc_url,
"websocket rpc client connected"
);
},
Err(error) => {
return Err(error);
},
}
let slot_subscribe_result = ws_client.slot_subscribe(1).await;
let slot_subscribe_output = match slot_subscribe_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let insert_ws_outgoing_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
"outgoing",
&slot_subscribe_output.request_body,
)
.await;
match insert_ws_outgoing_result {
Ok(()) => {},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to store outgoing websocket subscribe request"
);
},
}
let insert_ws_incoming_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
"incoming",
&slot_subscribe_output.response_body,
)
.await;
match insert_ws_incoming_result {
Ok(()) => {},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to store incoming websocket subscribe response"
);
},
}
let slot_subscription_handle = crate::KhbbWsSubscriptionHandle {
request_id: slot_subscribe_output.request_id,
subscription_id: slot_subscribe_output.subscription_id,
kind: crate::KhbbWsSubscriptionKind::Slot,
};
tracing::info!(
listener_session_id = session.id,
request_id = slot_subscription_handle.request_id,
subscription_id = slot_subscription_handle.subscription_id,
"slot websocket subscription established"
);
loop {
tokio::select! {
_ = interval.tick() => {
tick_count = tick_count.saturating_add(1);
tracing::trace!(
listener_session_id = session.id,
tick_count = tick_count,
"listener runtime heartbeat"
);
let slot_result = http_client.get_slot(tick_count as u64).await;
match slot_result {
Ok(call_output) => {
@@ -64,6 +143,14 @@ pub async fn run_listener_runtime(
error = %error,
"failed to insert raw http rpc message"
);
} else {
tracing::trace!(
listener_session_id = session.id,
request_id = call_output.request_id,
method = %call_output.method,
status = status,
"raw http rpc message stored"
);
}
}
Err(error) => {
@@ -73,11 +160,55 @@ pub async fn run_listener_runtime(
);
}
}
tracing::trace!(
listener_session_id = session.id,
tick_count = tick_count,
"listener runtime heartbeat"
);
let ws_read_timeout_result = tokio::time::timeout(
std::time::Duration::from_millis(50),
ws_client.read_next_text_message(),
)
.await;
match ws_read_timeout_result {
Ok(read_result) => {
match read_result {
Ok(Some(message_text)) => {
let insert_ws_message_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
"incoming",
&message_text,
)
.await;
match insert_ws_message_result {
Ok(()) => {
tracing::trace!(
listener_session_id = session.id,
"raw websocket message stored"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to store incoming websocket message"
);
}
}
}
Ok(None) => {
tracing::info!(
listener_session_id = session.id,
"websocket stream ended"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to read websocket message"
);
}
}
}
Err(_) => {}
}
if tick_count >= 3 {
break;
}
@@ -116,6 +247,44 @@ pub async fn run_listener_runtime(
}
}
}
let unsubscribe_result = ws_client
.unsubscribe(
slot_subscription_handle.kind,
slot_subscription_handle.subscription_id,
tick_count.saturating_add(10),
)
.await;
match unsubscribe_result {
Ok(value) => {
tracing::info!(
listener_session_id = session.id,
unsubscribed = value,
subscription_id = slot_subscription_handle.subscription_id,
"slot websocket subscription cancelled"
);
},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
subscription_id = slot_subscription_handle.subscription_id,
"failed to cancel slot websocket subscription"
);
},
}
let ws_close_result = ws_client.close().await;
match ws_close_result {
Ok(()) => {
tracing::info!(listener_session_id = session.id, "websocket rpc client closed");
},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to close websocket rpc client"
);
},
}
let status_update_result =
crate::storage::update_listener_session_status(pool, session.id, "stopped").await;
match status_update_result {

View File

@@ -0,0 +1,796 @@
// file: khbb_lib/src/solana_rpc_ws.rs
//! Minimal Solana WebSocket JSON-RPC client.
//!
//! This module keeps full control over the WebSocket transport and JSON-RPC
//! envelopes while reusing official Solana RPC config and response types where
//! they exist.
use futures_util::SinkExt;
use futures_util::StreamExt;
/// Configuration of the minimal Solana WebSocket RPC client.
#[derive(Debug, Clone)]
pub struct KhbbSolanaWsRpcClientConfig {
/// Base WebSocket RPC endpoint.
pub url: std::string::String,
}
/// Supported WebSocket subscription kinds for the initial implementation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum KhbbWsSubscriptionKind {
/// `slotSubscribe`
Slot,
/// `logsSubscribe`
Logs,
/// `programSubscribe`
Program,
}
impl KhbbWsSubscriptionKind {
/// Returns the JSON-RPC subscribe method name.
pub(crate) fn subscribe_method_name(&self) -> &'static str {
match self {
Self::Slot => "slotSubscribe",
Self::Logs => "logsSubscribe",
Self::Program => "programSubscribe",
}
}
/// Returns the JSON-RPC unsubscribe method name.
pub(crate) fn unsubscribe_method_name(&self) -> &'static str {
match self {
Self::Slot => "slotUnsubscribe",
Self::Logs => "logsUnsubscribe",
Self::Program => "programUnsubscribe",
}
}
}
/// Minimal JSON-RPC request envelope for WebSocket RPC calls.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(bound(
serialize = "TParams: serde::Serialize",
deserialize = "TParams: serde::de::DeserializeOwned"
))]
pub struct KhbbWsJsonRpcRequestEnvelope<TParams>
where
TParams: serde::Serialize + serde::de::DeserializeOwned,
{
/// JSON-RPC protocol version.
pub jsonrpc: std::string::String,
/// RPC method name.
pub method: std::string::String,
/// RPC parameters.
pub params: TParams,
/// Request identifier.
pub id: u64,
}
/// Minimal JSON-RPC response envelope for WebSocket RPC calls.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(bound(
serialize = "TResult: serde::Serialize",
deserialize = "TResult: serde::de::DeserializeOwned"
))]
pub struct KhbbWsJsonRpcResponseEnvelope<TResult>
where
TResult: serde::Serialize + serde::de::DeserializeOwned,
{
/// JSON-RPC protocol version.
pub jsonrpc: std::string::String,
/// Successful result if present.
pub result: std::option::Option<TResult>,
/// Raw RPC error object if present.
pub error: std::option::Option<serde_json::Value>,
/// Response identifier.
pub id: serde_json::Value,
}
/// Notification params envelope used by Solana PubSub notifications.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(bound(
serialize = "TResult: serde::Serialize",
deserialize = "TResult: serde::de::DeserializeOwned"
))]
pub struct KhbbWsNotificationParams<TResult>
where
TResult: serde::Serialize + serde::de::DeserializeOwned,
{
/// Notification result payload.
pub result: TResult,
/// Subscription identifier.
pub subscription: u64,
}
/// Notification envelope used by Solana PubSub notifications.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(bound(
serialize = "TResult: serde::Serialize",
deserialize = "TResult: serde::de::DeserializeOwned"
))]
pub struct KhbbWsNotificationEnvelope<TResult>
where
TResult: serde::Serialize + serde::de::DeserializeOwned,
{
/// JSON-RPC protocol version.
pub jsonrpc: std::string::String,
/// Notification method name.
pub method: std::string::String,
/// Notification parameters.
pub params: KhbbWsNotificationParams<TResult>,
}
/// Handle returned after a successful subscription.
#[derive(Debug, Clone)]
pub struct KhbbWsSubscriptionHandle {
/// Client-side request identifier used for the subscribe call.
pub request_id: u64,
/// Server-side subscription identifier.
pub subscription_id: u64,
/// Subscription kind.
pub kind: KhbbWsSubscriptionKind,
}
/// Output of a successful WebSocket subscribe call.
#[derive(Debug, Clone)]
pub struct KhbbWsSubscribeCallOutput {
/// Client-side request identifier used for the subscribe call.
pub request_id: u64,
/// Subscribe method name.
pub method: std::string::String,
/// Server-side subscription identifier.
pub subscription_id: u64,
/// Raw request JSON sent over the socket.
pub request_body: std::string::String,
/// Raw response JSON received from the socket.
pub response_body: std::string::String,
}
/// Minimal Solana WebSocket JSON-RPC client.
#[derive(Debug)]
pub struct KhbbSolanaWsRpcClient {
/// Client configuration.
pub(crate) config: KhbbSolanaWsRpcClientConfig,
/// Connected WebSocket stream if connected.
pub(crate) ws_stream: std::option::Option<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
>,
}
impl KhbbSolanaWsRpcClient {
/// Creates a new minimal Solana WebSocket JSON-RPC client.
pub fn new(
config: KhbbSolanaWsRpcClientConfig,
) -> core::result::Result<Self, crate::KhbbError> {
if config.url.trim().is_empty() {
return Err(crate::KhbbError::Config {
message: std::string::String::from(
"solana websocket rpc client url must not be empty",
),
});
}
Ok(Self { config, ws_stream: None })
}
/// Connects the WebSocket client to the configured endpoint.
pub async fn connect(&mut self) -> core::result::Result<(), crate::KhbbError> {
if self.ws_stream.is_some() {
return Err(crate::KhbbError::Runtime {
context: "connect websocket client",
message: std::string::String::from("websocket client is already connected"),
});
}
let connect_result = tokio_tungstenite::connect_async(&self.config.url).await;
let (ws_stream, _) = match connect_result {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Runtime {
context: "connect websocket client",
message: error.to_string(),
});
},
};
self.ws_stream = Some(ws_stream);
Ok(())
}
/// Closes the current WebSocket connection if connected.
pub async fn close(&mut self) -> core::result::Result<(), crate::KhbbError> {
if self.ws_stream.is_none() {
return Ok(());
}
let ws_stream_option = self.ws_stream.as_mut();
let ws_stream = match ws_stream_option {
Some(value) => value,
None => {
return Ok(());
},
};
let close_result = ws_stream.close(None).await;
match close_result {
Ok(()) => {
self.ws_stream = None;
Ok(())
},
Err(error) => Err(crate::KhbbError::Runtime {
context: "close websocket client",
message: error.to_string(),
}),
}
}
/// Sends a generic JSON-RPC request over the current WebSocket connection.
pub async fn send_json_rpc_request<TParams>(
&mut self,
method: &str,
params: TParams,
id: u64,
) -> core::result::Result<std::string::String, crate::KhbbError>
where
TParams: serde::Serialize + serde::de::DeserializeOwned,
{
let request = KhbbWsJsonRpcRequestEnvelope {
jsonrpc: std::string::String::from("2.0"),
method: std::string::String::from(method),
params,
id,
};
let request_body_result = serde_json::to_string(&request);
let request_body = match request_body_result {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Json {
context: "serialize websocket json rpc request",
message: error.to_string(),
});
},
};
let ws_stream_option = self.ws_stream.as_mut();
let ws_stream = match ws_stream_option {
Some(value) => value,
None => {
return Err(crate::KhbbError::Runtime {
context: "send websocket json rpc request",
message: std::string::String::from("websocket client is not connected"),
});
},
};
let send_result = ws_stream
.send(tokio_tungstenite::tungstenite::Message::Text(request_body.clone().into()))
.await;
match send_result {
Ok(()) => Ok(request_body),
Err(error) => Err(crate::KhbbError::Runtime {
context: "send websocket json rpc request",
message: error.to_string(),
}),
}
}
/// Reads the next text message from the socket.
///
/// This method skips ping and pong frames, answers ping with pong, and
/// returns `Ok(None)` on close or end-of-stream.
pub async fn read_next_text_message(
&mut self,
) -> core::result::Result<std::option::Option<std::string::String>, crate::KhbbError> {
loop {
let ws_stream_option = self.ws_stream.as_mut();
let ws_stream = match ws_stream_option {
Some(value) => value,
None => {
return Err(crate::KhbbError::Runtime {
context: "read websocket message",
message: std::string::String::from("websocket client is not connected"),
});
},
};
let next_result = ws_stream.next().await;
let message_result = match next_result {
Some(value) => value,
None => {
return Ok(None);
},
};
let message = match message_result {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Runtime {
context: "read websocket message",
message: error.to_string(),
});
},
};
match message {
tokio_tungstenite::tungstenite::Message::Text(text) => {
return Ok(Some(text.to_string()));
},
tokio_tungstenite::tungstenite::Message::Binary(binary) => {
let utf8_result = std::string::String::from_utf8(binary.to_vec());
match utf8_result {
Ok(value) => {
return Ok(Some(value));
},
Err(error) => {
return Err(crate::KhbbError::Runtime {
context: "decode websocket binary message as utf-8",
message: error.to_string(),
});
},
}
},
tokio_tungstenite::tungstenite::Message::Ping(payload) => {
let pong_result = ws_stream
.send(tokio_tungstenite::tungstenite::Message::Pong(payload))
.await;
match pong_result {
Ok(()) => {},
Err(error) => {
return Err(crate::KhbbError::Runtime {
context: "send websocket pong frame",
message: error.to_string(),
});
},
}
},
tokio_tungstenite::tungstenite::Message::Pong(_) => {},
tokio_tungstenite::tungstenite::Message::Close(_) => {
return Ok(None);
},
tokio_tungstenite::tungstenite::Message::Frame(_) => {},
}
}
}
/// Performs a `slotSubscribe` call.
pub async fn slot_subscribe(
&mut self,
id: u64,
) -> core::result::Result<KhbbWsSubscribeCallOutput, crate::KhbbError> {
self.subscribe_with_raw_response(
KhbbWsSubscriptionKind::Slot,
std::vec::Vec::<serde_json::Value>::new(),
id,
)
.await
}
/// Performs a `logsSubscribe` call.
pub async fn logs_subscribe(
&mut self,
filter: solana_rpc_client_api::config::RpcTransactionLogsFilter,
config: std::option::Option<solana_rpc_client_api::config::RpcTransactionLogsConfig>,
id: u64,
) -> core::result::Result<KhbbWsSubscribeCallOutput, crate::KhbbError> {
let params_result = build_logs_subscribe_params(filter, config);
let params = match params_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
self.subscribe_with_raw_response(KhbbWsSubscriptionKind::Logs, params, id).await
}
/// Performs a `programSubscribe` call.
pub async fn program_subscribe(
&mut self,
program_id: &str,
config: std::option::Option<solana_rpc_client_api::config::RpcProgramAccountsConfig>,
id: u64,
) -> core::result::Result<KhbbWsSubscribeCallOutput, crate::KhbbError> {
let params_result = build_program_subscribe_params(program_id, config);
let params = match params_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
self.subscribe_with_raw_response(KhbbWsSubscriptionKind::Program, params, id)
.await
}
/// Performs a generic unsubscribe call.
pub async fn unsubscribe(
&mut self,
kind: KhbbWsSubscriptionKind,
subscription_id: u64,
id: u64,
) -> core::result::Result<bool, crate::KhbbError> {
let request_body_result = self
.send_json_rpc_request(
kind.unsubscribe_method_name(),
vec![serde_json::Value::Number(subscription_id.into())],
id,
)
.await;
match request_body_result {
Ok(_) => {},
Err(error) => {
return Err(error);
},
}
let response_body_result = self.read_next_text_message().await;
let response_body_option = match response_body_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let response_body = match response_body_option {
Some(value) => value,
None => {
return Err(crate::KhbbError::Runtime {
context: "read websocket unsubscribe response",
message: std::string::String::from("websocket stream ended before response"),
});
},
};
let parse_result = parse_json_rpc_response::<bool>(&response_body);
let parsed_response = match parse_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
if let Some(error_value) = parsed_response.error {
let error_text_result = serde_json::to_string(&error_value);
let error_text = match error_text_result {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Json {
context: "serialize websocket unsubscribe rpc error",
message: error.to_string(),
});
}
};
return Err(crate::KhbbError::Runtime {
context: "websocket unsubscribe returned rpc error",
message: error_text,
});
}
match parsed_response.result {
Some(value) => Ok(value),
None => Err(crate::KhbbError::Runtime {
context: "websocket unsubscribe returned empty result",
message: response_body,
}),
}
}
async fn subscribe_with_raw_response<TParams>(
&mut self,
kind: KhbbWsSubscriptionKind,
params: TParams,
id: u64,
) -> core::result::Result<KhbbWsSubscribeCallOutput, crate::KhbbError>
where
TParams: serde::Serialize + serde::de::DeserializeOwned,
{
let request_body_result =
self.send_json_rpc_request(kind.subscribe_method_name(), params, id).await;
let request_body = match request_body_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let response_body_result = self.read_next_text_message().await;
let response_body_option = match response_body_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let response_body = match response_body_option {
Some(value) => value,
None => {
return Err(crate::KhbbError::Runtime {
context: "read websocket subscribe response",
message: std::string::String::from("websocket stream ended before response"),
});
},
};
let subscription_id_result = parse_subscription_id_response(&response_body);
let subscription_id = match subscription_id_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
Ok(KhbbWsSubscribeCallOutput {
request_id: id,
method: std::string::String::from(kind.subscribe_method_name()),
subscription_id,
request_body,
response_body,
})
}
}
/// Parses a standard JSON-RPC response envelope.
pub(crate) fn parse_json_rpc_response<TResult>(
response_body: &str,
) -> core::result::Result<KhbbWsJsonRpcResponseEnvelope<TResult>, crate::KhbbError>
where
TResult: serde::Serialize + serde::de::DeserializeOwned,
{
let parse_result =
serde_json::from_str::<KhbbWsJsonRpcResponseEnvelope<TResult>>(response_body);
match parse_result {
Ok(value) => Ok(value),
Err(error) => Err(crate::KhbbError::Json {
context: "parse websocket json rpc response",
message: error.to_string(),
}),
}
}
/// Parses a `slotNotification` payload.
pub(crate) fn parse_slot_notification(
response_body: &str,
) -> core::result::Result<
KhbbWsNotificationEnvelope<solana_rpc_client_api::response::SlotInfo>,
crate::KhbbError,
> {
let parse_result = serde_json::from_str::<
KhbbWsNotificationEnvelope<solana_rpc_client_api::response::SlotInfo>,
>(response_body);
match parse_result {
Ok(value) => Ok(value),
Err(error) => Err(crate::KhbbError::Json {
context: "parse websocket slot notification",
message: error.to_string(),
}),
}
}
/// Parses a `logsNotification` payload.
pub(crate) fn parse_logs_notification(
response_body: &str,
) -> core::result::Result<
KhbbWsNotificationEnvelope<
solana_rpc_client_api::response::Response<solana_rpc_client_api::response::RpcLogsResponse>,
>,
crate::KhbbError,
> {
let parse_result = serde_json::from_str::<
KhbbWsNotificationEnvelope<
solana_rpc_client_api::response::Response<
solana_rpc_client_api::response::RpcLogsResponse,
>,
>,
>(response_body);
match parse_result {
Ok(value) => Ok(value),
Err(error) => Err(crate::KhbbError::Json {
context: "parse websocket logs notification",
message: error.to_string(),
}),
}
}
/// Parses a `programNotification` payload.
pub(crate) fn parse_program_notification(
response_body: &str,
) -> core::result::Result<
KhbbWsNotificationEnvelope<
solana_rpc_client_api::response::Response<solana_rpc_client_api::response::RpcKeyedAccount>,
>,
crate::KhbbError,
> {
let parse_result = serde_json::from_str::<
KhbbWsNotificationEnvelope<
solana_rpc_client_api::response::Response<
solana_rpc_client_api::response::RpcKeyedAccount,
>,
>,
>(response_body);
match parse_result {
Ok(value) => Ok(value),
Err(error) => Err(crate::KhbbError::Json {
context: "parse websocket program notification",
message: error.to_string(),
}),
}
}
/// Parses a subscribe response and extracts the server-side subscription id.
pub(crate) fn parse_subscription_id_response(
response_body: &str,
) -> core::result::Result<u64, crate::KhbbError> {
let parse_result = parse_json_rpc_response::<u64>(response_body);
let parsed_response = match parse_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
if let Some(error_value) = parsed_response.error {
let error_text_result = serde_json::to_string(&error_value);
let error_text = match error_text_result {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Json {
context: "serialize websocket subscribe rpc error",
message: error.to_string(),
});
}
};
return Err(crate::KhbbError::Runtime {
context: "websocket subscribe returned rpc error",
message: error_text,
});
}
match parsed_response.result {
Some(value) => Ok(value),
None => Err(crate::KhbbError::Runtime {
context: "websocket subscribe returned empty result",
message: response_body.to_string(),
}),
}
}
/// Builds the JSON-RPC params array for `logsSubscribe`.
pub(crate) fn build_logs_subscribe_params(
filter: solana_rpc_client_api::config::RpcTransactionLogsFilter,
config: std::option::Option<solana_rpc_client_api::config::RpcTransactionLogsConfig>,
) -> core::result::Result<serde_json::Value, crate::KhbbError> {
if config.is_some() {
let to_value_result = serde_json::to_value((filter, config));
match to_value_result {
Ok(value) => Ok(value),
Err(error) => Err(crate::KhbbError::Json {
context: "serialize websocket logsSubscribe params",
message: error.to_string(),
}),
}
} else {
let to_value_result = serde_json::to_value((filter,));
match to_value_result {
Ok(value) => Ok(value),
Err(error) => Err(crate::KhbbError::Json {
context: "serialize websocket logsSubscribe params",
message: error.to_string(),
}),
}
}
}
/// Builds the JSON-RPC params array for `programSubscribe`.
pub(crate) fn build_program_subscribe_params(
program_id: &str,
config: std::option::Option<solana_rpc_client_api::config::RpcProgramAccountsConfig>,
) -> core::result::Result<serde_json::Value, crate::KhbbError> {
if program_id.trim().is_empty() {
return Err(crate::KhbbError::Config {
message: std::string::String::from("programSubscribe program_id must not be empty"),
});
}
if config.is_some() {
let to_value_result = serde_json::to_value((program_id, config));
match to_value_result {
Ok(value) => Ok(value),
Err(error) => Err(crate::KhbbError::Json {
context: "serialize websocket programSubscribe params",
message: error.to_string(),
}),
}
} else {
let to_value_result = serde_json::to_value((program_id,));
match to_value_result {
Ok(value) => Ok(value),
Err(error) => Err(crate::KhbbError::Json {
context: "serialize websocket programSubscribe params",
message: error.to_string(),
}),
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn new_rejects_empty_url() {
let config = super::KhbbSolanaWsRpcClientConfig { url: std::string::String::new() };
let result = super::KhbbSolanaWsRpcClient::new(config);
assert!(result.is_err());
}
#[test]
fn new_accepts_valid_url() {
let config = super::KhbbSolanaWsRpcClientConfig {
url: std::string::String::from("wss://example.invalid"),
};
let result = super::KhbbSolanaWsRpcClient::new(config);
assert!(result.is_ok());
}
#[test]
fn subscription_kind_method_names_are_stable() {
assert_eq!(super::KhbbWsSubscriptionKind::Slot.subscribe_method_name(), "slotSubscribe");
assert_eq!(
super::KhbbWsSubscriptionKind::Slot.unsubscribe_method_name(),
"slotUnsubscribe"
);
assert_eq!(super::KhbbWsSubscriptionKind::Logs.subscribe_method_name(), "logsSubscribe");
assert_eq!(
super::KhbbWsSubscriptionKind::Program.subscribe_method_name(),
"programSubscribe"
);
}
#[test]
fn parse_subscription_id_response_accepts_success_payload() {
let body = r#"{
"jsonrpc":"2.0",
"result":42,
"id":7
}"#;
let result = super::parse_subscription_id_response(body);
assert!(result.is_ok());
assert_eq!(result.expect("subscription id"), 42);
}
#[test]
fn parse_subscription_id_response_rejects_error_payload() {
let body = r#"{
"jsonrpc":"2.0",
"error":{"code":-32000,"message":"failure"},
"id":7
}"#;
let result = super::parse_subscription_id_response(body);
assert!(result.is_err());
}
#[test]
fn parse_slot_notification_accepts_valid_payload() {
let body = r#"{
"jsonrpc":"2.0",
"method":"slotNotification",
"params":{
"result":{
"parent":75,
"root":44,
"slot":76
},
"subscription":3
}
}"#;
let result = super::parse_slot_notification(body);
assert!(result.is_ok());
let notification = result.expect("slot notification");
assert_eq!(notification.method, "slotNotification");
assert_eq!(notification.params.subscription, 3);
assert_eq!(notification.params.result.slot, 76);
}
#[test]
fn build_logs_subscribe_params_accepts_filter_only() {
let result = super::build_logs_subscribe_params(
solana_rpc_client_api::config::RpcTransactionLogsFilter::All,
None,
);
assert!(result.is_ok());
let params = result.expect("logs params");
assert!(params.is_array());
}
#[test]
fn build_program_subscribe_params_rejects_empty_program_id() {
let result = super::build_program_subscribe_params("", None);
assert!(result.is_err());
}
#[test]
fn build_program_subscribe_params_accepts_program_id_only() {
let result =
super::build_program_subscribe_params("11111111111111111111111111111111", None);
assert!(result.is_ok());
let params = result.expect("program params");
assert!(params.is_array());
}
}

View File

@@ -323,6 +323,38 @@ VALUES (?, ?, ?, ?, ?, ?, ?)
}
}
pub(crate) async fn insert_raw_ws_message(
pool: &sqlx::SqlitePool,
session_id: i64,
direction: &str,
message_text: &str,
) -> core::result::Result<(), crate::KhbbError> {
let now = chrono::Utc::now().to_rfc3339();
let insert_result = sqlx::query(
r#"
INSERT INTO raw_ws_messages (
listener_session_id,
direction,
message_text,
created_at
) VALUES (?1, ?2, ?3, ?4);
"#,
)
.bind(session_id)
.bind(direction)
.bind(message_text)
.bind(now)
.execute(pool)
.await;
match insert_result {
Ok(_) => Ok(()),
Err(error) => Err(crate::KhbbError::Database {
context: "insert raw websocket message",
message: error.to_string(),
}),
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -473,4 +505,26 @@ WHERE id = ?1;
insert_raw_http_rpc_message(&pool, session.id, 1, "getSlot", "{}", "{}", "ok").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn insert_raw_ws_message_inserts_row() {
let database_url = build_temp_sqlite_url();
let pool_result = crate::create_sqlite_pool(&database_url).await;
assert!(pool_result.is_ok());
let pool = pool_result.expect("create sqlite pool");
let schema_result = crate::ensure_sqlite_schema(&pool).await;
assert!(schema_result.is_ok());
let config = build_test_config(database_url);
let insert_session_result = super::insert_listener_session(&pool, &config).await;
assert!(insert_session_result.is_ok());
let session = insert_session_result.expect("insert listener session");
let insert_ws_result = super::insert_raw_ws_message(
&pool,
session.id,
"incoming",
r#"{"jsonrpc":"2.0","method":"slotNotification"}"#,
)
.await;
assert!(insert_ws_result.is_ok());
}
}