This commit is contained in:
2026-04-18 12:50:14 +02:00
parent ee6c4c3e91
commit 5ac8a7f1d5
8 changed files with 491 additions and 154 deletions

View File

@@ -25,6 +25,10 @@ pub async fn run_listener_app(config_path: &str) -> core::result::Result<(), cra
yellowstone_grpc_url = ?config.yellowstone_grpc_url,
bootstrap_database = config.bootstrap_database,
listener_poll_interval_ms = config.listener_poll_interval_ms,
enable_ws_slot_subscribe = config.enable_ws_slot_subscribe,
enable_ws_logs_subscribe = config.enable_ws_logs_subscribe,
enable_ws_program_subscribe = config.enable_ws_program_subscribe,
ws_program_subscribe_program_ids = ?config.ws_program_subscribe_program_ids,
"khbb listener app starting"
);
let pool_result = crate::create_sqlite_pool(&config.database_url).await;

View File

@@ -19,6 +19,14 @@ pub struct KhbbAppConfig {
pub bootstrap_database: bool,
/// Polling interval used by the current runtime skeleton.
pub listener_poll_interval_ms: u64,
/// Enables or disables `slotSubscribe` during listener startup.
pub enable_ws_slot_subscribe: bool,
/// Enables or disables `logsSubscribe` during listener startup.
pub enable_ws_logs_subscribe: bool,
/// Enables or disables `programSubscribe` during listener startup.
pub enable_ws_program_subscribe: bool,
/// Program ids used when `programSubscribe` is enabled.
pub ws_program_subscribe_program_ids: std::vec::Vec<std::string::String>,
}
impl KhbbAppConfig {
@@ -80,6 +88,24 @@ impl KhbbAppConfig {
),
});
}
if self.enable_ws_program_subscribe && self.ws_program_subscribe_program_ids.is_empty() {
return Err(crate::KhbbError::Config {
message: std::string::String::from(
"ws_program_subscribe_program_ids must not be empty when enable_ws_program_subscribe is true",
),
});
}
if self.enable_ws_program_subscribe {
for program_id in &self.ws_program_subscribe_program_ids {
if program_id.trim().is_empty() {
return Err(crate::KhbbError::Config {
message: std::string::String::from(
"ws_program_subscribe_program_ids must not contain empty program ids",
),
});
}
}
}
Ok(())
}
}
@@ -101,6 +127,10 @@ mod tests {
log_filter: std::string::String::from("info"),
bootstrap_database: true,
listener_poll_interval_ms: 1000,
enable_ws_slot_subscribe: true,
enable_ws_logs_subscribe: true,
enable_ws_program_subscribe: false,
ws_program_subscribe_program_ids: vec![],
}
}
@@ -157,15 +187,18 @@ mod tests {
std::env::temp_dir().join(std::format!("khbb_config_test_{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&temp_dir).expect("create temp dir");
let config_path = temp_dir.join("config.json");
let config_json = r#"{
"database_url": "sqlite://./dbdata/app.db",
"solana_http_rpc_url": "https://mainnet.helius-rpc.com/?api-key=test",
"solana_ws_rpc_url": "wss://mainnet.helius-rpc.com/?api-key=test",
"yellowstone_grpc_url": "https://mainnet.helius-rpc.com:443",
"log_filter": "info",
"bootstrap_database": true,
"listener_poll_interval_ms": 1000
"database_url": "sqlite://./dbdata/app.db",
"solana_http_rpc_url": "https://mainnet.helius-rpc.com/?api-key=test",
"solana_ws_rpc_url": "wss://mainnet.helius-rpc.com/?api-key=test",
"yellowstone_grpc_url": "https://mainnet.helius-rpc.com:443",
"log_filter": "info",
"bootstrap_database": true,
"listener_poll_interval_ms": 1000,
"enable_ws_slot_subscribe": true,
"enable_ws_logs_subscribe": true,
"enable_ws_program_subscribe": false,
"ws_program_subscribe_program_ids": []
}"#;
std::fs::write(&config_path, config_json).expect("write config file");
@@ -177,4 +210,22 @@ mod tests {
let _ = std::fs::remove_file(&config_path);
let _ = std::fs::remove_dir_all(&temp_dir);
}
#[test]
fn validate_rejects_empty_program_ids_when_program_subscribe_is_enabled() {
let mut config = build_valid_config();
config.enable_ws_program_subscribe = true;
config.ws_program_subscribe_program_ids = vec![];
let result = config.validate();
assert!(result.is_err());
}
#[test]
fn validate_rejects_empty_program_id_entry() {
let mut config = build_valid_config();
config.enable_ws_program_subscribe = true;
config.ws_program_subscribe_program_ids = vec![std::string::String::from("")];
let result = config.validate();
assert!(result.is_err());
}
}

View File

@@ -1,4 +1,5 @@
// file: khbb_lib/src/listener.rs
//! Listener runtime skeleton.
//!
//! This module does not yet connect to Solana RPC, WebSocket or gRPC streams.
@@ -44,6 +45,7 @@ pub async fn run_listener_runtime(
return Err(error);
},
};
let mut ws_subscription_handles = std::vec::Vec::<crate::KhbbWsSubscriptionHandle>::new();
let ws_connect_result = ws_client.connect().await;
match ws_connect_result {
Ok(()) => {
@@ -57,58 +59,181 @@ pub async fn run_listener_runtime(
return Err(error);
},
}
let slot_subscribe_result = ws_client.slot_subscribe(1).await;
let slot_subscribe_output = match slot_subscribe_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let insert_ws_outgoing_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
"outgoing",
&slot_subscribe_output.request_body,
)
.await;
match insert_ws_outgoing_result {
Ok(()) => {},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to store outgoing websocket subscribe request"
);
},
if config.enable_ws_slot_subscribe {
let slot_subscribe_result = ws_client.slot_subscribe(1).await;
let slot_subscribe_output = match slot_subscribe_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let insert_ws_outgoing_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
"outgoing",
&slot_subscribe_output.request_body,
)
.await;
match insert_ws_outgoing_result {
Ok(()) => {},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to store outgoing websocket subscribe request"
);
},
}
let insert_ws_incoming_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
"incoming",
&slot_subscribe_output.response_body,
)
.await;
match insert_ws_incoming_result {
Ok(()) => {},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to store incoming websocket subscribe response"
);
},
}
let slot_subscription_handle = crate::KhbbWsSubscriptionHandle {
request_id: slot_subscribe_output.request_id,
subscription_id: slot_subscribe_output.subscription_id,
kind: crate::KhbbWsSubscriptionKind::Slot,
};
tracing::info!(
listener_session_id = session.id,
request_id = slot_subscription_handle.request_id,
subscription_id = slot_subscription_handle.subscription_id,
"slot websocket subscription established"
);
ws_subscription_handles.push(slot_subscription_handle);
}
let insert_ws_incoming_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
"incoming",
&slot_subscribe_output.response_body,
)
.await;
match insert_ws_incoming_result {
Ok(()) => {},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to store incoming websocket subscribe response"
);
},
if config.enable_ws_logs_subscribe {
let logs_subscribe_result = ws_client
.logs_subscribe(solana_rpc_client_api::config::RpcTransactionLogsFilter::All, None, 2)
.await;
let logs_subscribe_output = match logs_subscribe_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let insert_ws_outgoing_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
"outgoing",
&logs_subscribe_output.request_body,
)
.await;
match insert_ws_outgoing_result {
Ok(()) => {},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to store outgoing websocket logs subscribe request"
);
},
}
let insert_ws_incoming_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
"incoming",
&logs_subscribe_output.response_body,
)
.await;
match insert_ws_incoming_result {
Ok(()) => {},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to store incoming websocket logs subscribe response"
);
},
}
let logs_subscription_handle = crate::KhbbWsSubscriptionHandle {
request_id: logs_subscribe_output.request_id,
subscription_id: logs_subscribe_output.subscription_id,
kind: crate::KhbbWsSubscriptionKind::Logs,
};
tracing::info!(
listener_session_id = session.id,
request_id = logs_subscription_handle.request_id,
subscription_id = logs_subscription_handle.subscription_id,
"logs websocket subscription established"
);
ws_subscription_handles.push(logs_subscription_handle);
}
if config.enable_ws_program_subscribe {
let mut program_request_id: u64 = 10;
for program_id in &config.ws_program_subscribe_program_ids {
let program_subscribe_result =
ws_client.program_subscribe(program_id, None, program_request_id).await;
let program_subscribe_output = match program_subscribe_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let insert_ws_outgoing_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
"outgoing",
&program_subscribe_output.request_body,
)
.await;
match insert_ws_outgoing_result {
Ok(()) => {},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
program_id = %program_id,
"failed to store outgoing websocket program subscribe request"
);
},
}
let insert_ws_incoming_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
"incoming",
&program_subscribe_output.response_body,
)
.await;
match insert_ws_incoming_result {
Ok(()) => {},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
program_id = %program_id,
"failed to store incoming websocket program subscribe response"
);
},
}
let program_subscription_handle = crate::KhbbWsSubscriptionHandle {
request_id: program_subscribe_output.request_id,
subscription_id: program_subscribe_output.subscription_id,
kind: crate::KhbbWsSubscriptionKind::Program,
};
tracing::info!(
listener_session_id = session.id,
request_id = program_subscription_handle.request_id,
subscription_id = program_subscription_handle.subscription_id,
program_id = %program_id,
"program websocket subscription established"
);
ws_subscription_handles.push(program_subscription_handle);
program_request_id = program_request_id.saturating_add(1);
}
}
let slot_subscription_handle = crate::KhbbWsSubscriptionHandle {
request_id: slot_subscribe_output.request_id,
subscription_id: slot_subscribe_output.subscription_id,
kind: crate::KhbbWsSubscriptionKind::Slot,
};
tracing::info!(
listener_session_id = session.id,
request_id = slot_subscription_handle.request_id,
subscription_id = slot_subscription_handle.subscription_id,
"slot websocket subscription established"
);
let mut final_status = std::string::String::from("stopped");
loop {
tokio::select! {
@@ -190,6 +315,102 @@ pub async fn run_listener_runtime(
);
}
}
let method_value_result =
serde_json::from_str::<serde_json::Value>(&message_text);
match method_value_result {
Ok(json_value) => {
let method_option = json_value
.get("method")
.and_then(serde_json::Value::as_str);
match method_option {
Some("slotNotification") => {
let parse_result =
crate::solana_rpc_ws::parse_slot_notification(&message_text);
match parse_result {
Ok(notification) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = notification.params.subscription,
slot = notification.params.result.slot,
parent = notification.params.result.parent,
root = notification.params.result.root,
"parsed slot notification"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to parse slot notification"
);
}
}
}
Some("logsNotification") => {
let parse_result =
crate::solana_rpc_ws::parse_logs_notification(&message_text);
match parse_result {
Ok(notification) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = notification.params.subscription,
signature = %notification.params.result.value.signature,
"parsed logs notification"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to parse logs notification"
);
}
}
}
Some("programNotification") => {
let parse_result =
crate::solana_rpc_ws::parse_program_notification(&message_text);
match parse_result {
Ok(notification) => {
tracing::trace!(
listener_session_id = session.id,
subscription_id = notification.params.subscription,
program_pubkey = %notification.params.result.value.pubkey,
"parsed program notification"
);
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to parse program notification"
);
}
}
}
Some(other_method) => {
tracing::trace!(
listener_session_id = session.id,
method = %other_method,
"received unsupported websocket notification method"
);
}
None => {
tracing::trace!(
listener_session_id = session.id,
"received websocket json message without notification method"
);
}
}
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to decode websocket message as json value"
);
}
}
}
Ok(None) => {
tracing::info!(
@@ -250,30 +471,34 @@ pub async fn run_listener_runtime(
}
}
}
let unsubscribe_result = ws_client
.unsubscribe(
slot_subscription_handle.kind,
slot_subscription_handle.subscription_id,
tick_count.saturating_add(10),
)
.await;
match unsubscribe_result {
Ok(value) => {
tracing::info!(
listener_session_id = session.id,
unsubscribed = value,
subscription_id = slot_subscription_handle.subscription_id,
"slot websocket subscription cancelled"
);
},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
subscription_id = slot_subscription_handle.subscription_id,
"failed to cancel slot websocket subscription"
);
},
for subscription_handle in &ws_subscription_handles {
let unsubscribe_result = ws_client
.unsubscribe(
subscription_handle.kind,
subscription_handle.subscription_id,
tick_count.saturating_add(subscription_handle.request_id),
)
.await;
match unsubscribe_result {
Ok(value) => {
tracing::info!(
listener_session_id = session.id,
unsubscribed = value,
subscription_id = subscription_handle.subscription_id,
kind = ?subscription_handle.kind,
"websocket subscription cancelled"
);
},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
subscription_id = subscription_handle.subscription_id,
kind = ?subscription_handle.kind,
"failed to cancel websocket subscription"
);
},
}
}
let ws_close_result = ws_client.close().await;
match ws_close_result {

View File

@@ -1,4 +1,5 @@
// file: khbb_lib/src/solana_rpc_ws.rs
//! Minimal Solana WebSocket JSON-RPC client.
//!
//! This module keeps full control over the WebSocket transport and JSON-RPC
@@ -412,51 +413,71 @@ impl KhbbSolanaWsRpcClient {
return Err(error);
},
}
let response_body_result = self.read_next_text_message().await;
let response_body_option = match response_body_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let response_body = match response_body_option {
Some(value) => value,
None => {
return Err(crate::KhbbError::Runtime {
context: "read websocket unsubscribe response",
message: std::string::String::from("websocket stream ended before response"),
});
},
};
let parse_result = parse_json_rpc_response::<bool>(&response_body);
let parsed_response = match parse_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
if let Some(error_value) = parsed_response.error {
let error_text_result = serde_json::to_string(&error_value);
let error_text = match error_text_result {
loop {
let response_body_result = self.read_next_text_message().await;
let response_body_option = match response_body_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let response_body = match response_body_option {
Some(value) => value,
None => {
return Err(crate::KhbbError::Runtime {
context: "read websocket unsubscribe response",
message: std::string::String::from(
"websocket stream ended before unsubscribe response",
),
});
},
};
let json_value_result = serde_json::from_str::<serde_json::Value>(&response_body);
let json_value = match json_value_result {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Json {
context: "serialize websocket unsubscribe rpc error",
context: "decode websocket unsubscribe response as json value",
message: error.to_string(),
});
}
},
};
return Err(crate::KhbbError::Runtime {
context: "websocket unsubscribe returned rpc error",
message: error_text,
});
}
match parsed_response.result {
Some(value) => Ok(value),
None => Err(crate::KhbbError::Runtime {
context: "websocket unsubscribe returned empty result",
message: response_body,
}),
let id_value_option = json_value.get("id");
if id_value_option.is_none() {
continue;
}
let parsed_response_result = parse_json_rpc_response::<bool>(&response_body);
let parsed_response = match parsed_response_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
if let Some(error_value) = parsed_response.error {
let error_text_result = serde_json::to_string(&error_value);
let error_text = match error_text_result {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Json {
context: "serialize websocket unsubscribe rpc error",
message: error.to_string(),
});
},
};
return Err(crate::KhbbError::Runtime {
context: "websocket unsubscribe returned rpc error",
message: error_text,
});
}
match parsed_response.result {
Some(value) => return Ok(value),
None => {
return Err(crate::KhbbError::Runtime {
context: "websocket unsubscribe returned empty result",
message: response_body,
});
},
}
}
}
@@ -477,36 +498,54 @@ impl KhbbSolanaWsRpcClient {
return Err(error);
},
};
let response_body_result = self.read_next_text_message().await;
let response_body_option = match response_body_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let response_body = match response_body_option {
Some(value) => value,
None => {
return Err(crate::KhbbError::Runtime {
context: "read websocket subscribe response",
message: std::string::String::from("websocket stream ended before response"),
});
},
};
let subscription_id_result = parse_subscription_id_response(&response_body);
let subscription_id = match subscription_id_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
Ok(KhbbWsSubscribeCallOutput {
request_id: id,
method: std::string::String::from(kind.subscribe_method_name()),
subscription_id,
request_body,
response_body,
})
loop {
let response_body_result = self.read_next_text_message().await;
let response_body_option = match response_body_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let response_body = match response_body_option {
Some(value) => value,
None => {
return Err(crate::KhbbError::Runtime {
context: "read websocket subscribe response",
message: std::string::String::from(
"websocket stream ended before subscribe response",
),
});
},
};
let json_value_result = serde_json::from_str::<serde_json::Value>(&response_body);
let json_value = match json_value_result {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Json {
context: "decode websocket subscribe response as json value",
message: error.to_string(),
});
},
};
let id_value_option = json_value.get("id");
if id_value_option.is_none() {
continue;
}
let subscription_id_result = parse_subscription_id_response(&response_body);
let subscription_id = match subscription_id_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
return Ok(KhbbWsSubscribeCallOutput {
request_id: id,
method: std::string::String::from(kind.subscribe_method_name()),
subscription_id,
request_body,
response_body,
});
}
}
}
@@ -519,7 +558,6 @@ where
{
let parse_result =
serde_json::from_str::<KhbbWsJsonRpcResponseEnvelope<TResult>>(response_body);
match parse_result {
Ok(value) => Ok(value),
Err(error) => Err(crate::KhbbError::Json {
@@ -618,7 +656,7 @@ pub(crate) fn parse_subscription_id_response(
context: "serialize websocket subscribe rpc error",
message: error.to_string(),
});
}
},
};
return Err(crate::KhbbError::Runtime {
context: "websocket subscribe returned rpc error",

View File

@@ -374,6 +374,10 @@ mod tests {
log_filter: std::string::String::from("info"),
bootstrap_database: true,
listener_poll_interval_ms: 1000,
enable_ws_slot_subscribe: true,
enable_ws_logs_subscribe: true,
enable_ws_program_subscribe: false,
ws_program_subscribe_program_ids: vec![],
}
}
@@ -497,6 +501,10 @@ WHERE id = ?1;
log_filter: "info".into(),
bootstrap_database: false,
listener_poll_interval_ms: 1000,
enable_ws_slot_subscribe: true,
enable_ws_logs_subscribe: true,
enable_ws_program_subscribe: false,
ws_program_subscribe_program_ids: vec![],
},
)
.await