diff --git a/Cargo.toml b/Cargo.toml index a54b9c9..9ddbf64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.4.2" +version = "0.4.3" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot" diff --git a/khbb_lib/src/app.rs b/khbb_lib/src/app.rs index 609aa4d..992316e 100644 --- a/khbb_lib/src/app.rs +++ b/khbb_lib/src/app.rs @@ -25,6 +25,10 @@ pub async fn run_listener_app(config_path: &str) -> core::result::Result<(), cra yellowstone_grpc_url = ?config.yellowstone_grpc_url, bootstrap_database = config.bootstrap_database, listener_poll_interval_ms = config.listener_poll_interval_ms, + enable_ws_slot_subscribe = config.enable_ws_slot_subscribe, + enable_ws_logs_subscribe = config.enable_ws_logs_subscribe, + enable_ws_program_subscribe = config.enable_ws_program_subscribe, + ws_program_subscribe_program_ids = ?config.ws_program_subscribe_program_ids, "khbb listener app starting" ); let pool_result = crate::create_sqlite_pool(&config.database_url).await; diff --git a/khbb_lib/src/config.rs b/khbb_lib/src/config.rs index b4b06e7..0d08408 100644 --- a/khbb_lib/src/config.rs +++ b/khbb_lib/src/config.rs @@ -19,6 +19,14 @@ pub struct KhbbAppConfig { pub bootstrap_database: bool, /// Polling interval used by the current runtime skeleton. pub listener_poll_interval_ms: u64, + /// Enables or disables `slotSubscribe` during listener startup. + pub enable_ws_slot_subscribe: bool, + /// Enables or disables `logsSubscribe` during listener startup. + pub enable_ws_logs_subscribe: bool, + /// Enables or disables `programSubscribe` during listener startup. + pub enable_ws_program_subscribe: bool, + /// Program ids used when `programSubscribe` is enabled. + pub ws_program_subscribe_program_ids: std::vec::Vec, } impl KhbbAppConfig { @@ -80,6 +88,24 @@ impl KhbbAppConfig { ), }); } + if self.enable_ws_program_subscribe && self.ws_program_subscribe_program_ids.is_empty() { + return Err(crate::KhbbError::Config { + message: std::string::String::from( + "ws_program_subscribe_program_ids must not be empty when enable_ws_program_subscribe is true", + ), + }); + } + if self.enable_ws_program_subscribe { + for program_id in &self.ws_program_subscribe_program_ids { + if program_id.trim().is_empty() { + return Err(crate::KhbbError::Config { + message: std::string::String::from( + "ws_program_subscribe_program_ids must not contain empty program ids", + ), + }); + } + } + } Ok(()) } } @@ -101,6 +127,10 @@ mod tests { log_filter: std::string::String::from("info"), bootstrap_database: true, listener_poll_interval_ms: 1000, + enable_ws_slot_subscribe: true, + enable_ws_logs_subscribe: true, + enable_ws_program_subscribe: false, + ws_program_subscribe_program_ids: vec![], } } @@ -157,15 +187,18 @@ mod tests { std::env::temp_dir().join(std::format!("khbb_config_test_{}", uuid::Uuid::new_v4())); std::fs::create_dir_all(&temp_dir).expect("create temp dir"); let config_path = temp_dir.join("config.json"); - let config_json = r#"{ - "database_url": "sqlite://./dbdata/app.db", - "solana_http_rpc_url": "https://mainnet.helius-rpc.com/?api-key=test", - "solana_ws_rpc_url": "wss://mainnet.helius-rpc.com/?api-key=test", - "yellowstone_grpc_url": "https://mainnet.helius-rpc.com:443", - "log_filter": "info", - "bootstrap_database": true, - "listener_poll_interval_ms": 1000 + "database_url": "sqlite://./dbdata/app.db", + "solana_http_rpc_url": "https://mainnet.helius-rpc.com/?api-key=test", + "solana_ws_rpc_url": "wss://mainnet.helius-rpc.com/?api-key=test", + "yellowstone_grpc_url": "https://mainnet.helius-rpc.com:443", + "log_filter": "info", + "bootstrap_database": true, + "listener_poll_interval_ms": 1000, + "enable_ws_slot_subscribe": true, + "enable_ws_logs_subscribe": true, + "enable_ws_program_subscribe": false, + "ws_program_subscribe_program_ids": [] }"#; std::fs::write(&config_path, config_json).expect("write config file"); @@ -177,4 +210,22 @@ mod tests { let _ = std::fs::remove_file(&config_path); let _ = std::fs::remove_dir_all(&temp_dir); } + + #[test] + fn validate_rejects_empty_program_ids_when_program_subscribe_is_enabled() { + let mut config = build_valid_config(); + config.enable_ws_program_subscribe = true; + config.ws_program_subscribe_program_ids = vec![]; + let result = config.validate(); + assert!(result.is_err()); + } + + #[test] + fn validate_rejects_empty_program_id_entry() { + let mut config = build_valid_config(); + config.enable_ws_program_subscribe = true; + config.ws_program_subscribe_program_ids = vec![std::string::String::from("")]; + let result = config.validate(); + assert!(result.is_err()); + } } diff --git a/khbb_lib/src/listener.rs b/khbb_lib/src/listener.rs index ce2ec6e..48a151e 100644 --- a/khbb_lib/src/listener.rs +++ b/khbb_lib/src/listener.rs @@ -1,4 +1,5 @@ // file: khbb_lib/src/listener.rs + //! Listener runtime skeleton. //! //! This module does not yet connect to Solana RPC, WebSocket or gRPC streams. @@ -44,6 +45,7 @@ pub async fn run_listener_runtime( return Err(error); }, }; + let mut ws_subscription_handles = std::vec::Vec::::new(); let ws_connect_result = ws_client.connect().await; match ws_connect_result { Ok(()) => { @@ -57,58 +59,181 @@ pub async fn run_listener_runtime( return Err(error); }, } - let slot_subscribe_result = ws_client.slot_subscribe(1).await; - let slot_subscribe_output = match slot_subscribe_result { - Ok(value) => value, - Err(error) => { - return Err(error); - }, - }; - let insert_ws_outgoing_result = crate::storage::insert_raw_ws_message( - pool, - session.id, - "outgoing", - &slot_subscribe_output.request_body, - ) - .await; - match insert_ws_outgoing_result { - Ok(()) => {}, - Err(error) => { - tracing::error!( - listener_session_id = session.id, - error = %error, - "failed to store outgoing websocket subscribe request" - ); - }, + if config.enable_ws_slot_subscribe { + let slot_subscribe_result = ws_client.slot_subscribe(1).await; + let slot_subscribe_output = match slot_subscribe_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let insert_ws_outgoing_result = crate::storage::insert_raw_ws_message( + pool, + session.id, + "outgoing", + &slot_subscribe_output.request_body, + ) + .await; + match insert_ws_outgoing_result { + Ok(()) => {}, + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to store outgoing websocket subscribe request" + ); + }, + } + let insert_ws_incoming_result = crate::storage::insert_raw_ws_message( + pool, + session.id, + "incoming", + &slot_subscribe_output.response_body, + ) + .await; + match insert_ws_incoming_result { + Ok(()) => {}, + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to store incoming websocket subscribe response" + ); + }, + } + let slot_subscription_handle = crate::KhbbWsSubscriptionHandle { + request_id: slot_subscribe_output.request_id, + subscription_id: slot_subscribe_output.subscription_id, + kind: crate::KhbbWsSubscriptionKind::Slot, + }; + tracing::info!( + listener_session_id = session.id, + request_id = slot_subscription_handle.request_id, + subscription_id = slot_subscription_handle.subscription_id, + "slot websocket subscription established" + ); + ws_subscription_handles.push(slot_subscription_handle); } - let insert_ws_incoming_result = crate::storage::insert_raw_ws_message( - pool, - session.id, - "incoming", - &slot_subscribe_output.response_body, - ) - .await; - match insert_ws_incoming_result { - Ok(()) => {}, - Err(error) => { - tracing::error!( - listener_session_id = session.id, - error = %error, - "failed to store incoming websocket subscribe response" - ); - }, + if config.enable_ws_logs_subscribe { + let logs_subscribe_result = ws_client + .logs_subscribe(solana_rpc_client_api::config::RpcTransactionLogsFilter::All, None, 2) + .await; + let logs_subscribe_output = match logs_subscribe_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let insert_ws_outgoing_result = crate::storage::insert_raw_ws_message( + pool, + session.id, + "outgoing", + &logs_subscribe_output.request_body, + ) + .await; + match insert_ws_outgoing_result { + Ok(()) => {}, + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to store outgoing websocket logs subscribe request" + ); + }, + } + let insert_ws_incoming_result = crate::storage::insert_raw_ws_message( + pool, + session.id, + "incoming", + &logs_subscribe_output.response_body, + ) + .await; + match insert_ws_incoming_result { + Ok(()) => {}, + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to store incoming websocket logs subscribe response" + ); + }, + } + let logs_subscription_handle = crate::KhbbWsSubscriptionHandle { + request_id: logs_subscribe_output.request_id, + subscription_id: logs_subscribe_output.subscription_id, + kind: crate::KhbbWsSubscriptionKind::Logs, + }; + tracing::info!( + listener_session_id = session.id, + request_id = logs_subscription_handle.request_id, + subscription_id = logs_subscription_handle.subscription_id, + "logs websocket subscription established" + ); + ws_subscription_handles.push(logs_subscription_handle); + } + if config.enable_ws_program_subscribe { + let mut program_request_id: u64 = 10; + for program_id in &config.ws_program_subscribe_program_ids { + let program_subscribe_result = + ws_client.program_subscribe(program_id, None, program_request_id).await; + let program_subscribe_output = match program_subscribe_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let insert_ws_outgoing_result = crate::storage::insert_raw_ws_message( + pool, + session.id, + "outgoing", + &program_subscribe_output.request_body, + ) + .await; + match insert_ws_outgoing_result { + Ok(()) => {}, + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + program_id = %program_id, + "failed to store outgoing websocket program subscribe request" + ); + }, + } + let insert_ws_incoming_result = crate::storage::insert_raw_ws_message( + pool, + session.id, + "incoming", + &program_subscribe_output.response_body, + ) + .await; + match insert_ws_incoming_result { + Ok(()) => {}, + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + program_id = %program_id, + "failed to store incoming websocket program subscribe response" + ); + }, + } + let program_subscription_handle = crate::KhbbWsSubscriptionHandle { + request_id: program_subscribe_output.request_id, + subscription_id: program_subscribe_output.subscription_id, + kind: crate::KhbbWsSubscriptionKind::Program, + }; + tracing::info!( + listener_session_id = session.id, + request_id = program_subscription_handle.request_id, + subscription_id = program_subscription_handle.subscription_id, + program_id = %program_id, + "program websocket subscription established" + ); + ws_subscription_handles.push(program_subscription_handle); + program_request_id = program_request_id.saturating_add(1); + } } - let slot_subscription_handle = crate::KhbbWsSubscriptionHandle { - request_id: slot_subscribe_output.request_id, - subscription_id: slot_subscribe_output.subscription_id, - kind: crate::KhbbWsSubscriptionKind::Slot, - }; - tracing::info!( - listener_session_id = session.id, - request_id = slot_subscription_handle.request_id, - subscription_id = slot_subscription_handle.subscription_id, - "slot websocket subscription established" - ); let mut final_status = std::string::String::from("stopped"); loop { tokio::select! { @@ -190,6 +315,102 @@ pub async fn run_listener_runtime( ); } } + let method_value_result = + serde_json::from_str::(&message_text); + match method_value_result { + Ok(json_value) => { + let method_option = json_value + .get("method") + .and_then(serde_json::Value::as_str); + match method_option { + Some("slotNotification") => { + let parse_result = + crate::solana_rpc_ws::parse_slot_notification(&message_text); + match parse_result { + Ok(notification) => { + tracing::trace!( + listener_session_id = session.id, + subscription_id = notification.params.subscription, + slot = notification.params.result.slot, + parent = notification.params.result.parent, + root = notification.params.result.root, + "parsed slot notification" + ); + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to parse slot notification" + ); + } + } + } + Some("logsNotification") => { + let parse_result = + crate::solana_rpc_ws::parse_logs_notification(&message_text); + match parse_result { + Ok(notification) => { + tracing::trace!( + listener_session_id = session.id, + subscription_id = notification.params.subscription, + signature = %notification.params.result.value.signature, + "parsed logs notification" + ); + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to parse logs notification" + ); + } + } + } + Some("programNotification") => { + let parse_result = + crate::solana_rpc_ws::parse_program_notification(&message_text); + match parse_result { + Ok(notification) => { + tracing::trace!( + listener_session_id = session.id, + subscription_id = notification.params.subscription, + program_pubkey = %notification.params.result.value.pubkey, + "parsed program notification" + ); + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to parse program notification" + ); + } + } + } + Some(other_method) => { + tracing::trace!( + listener_session_id = session.id, + method = %other_method, + "received unsupported websocket notification method" + ); + } + None => { + tracing::trace!( + listener_session_id = session.id, + "received websocket json message without notification method" + ); + } + } + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to decode websocket message as json value" + ); + } + } } Ok(None) => { tracing::info!( @@ -250,30 +471,34 @@ pub async fn run_listener_runtime( } } } - let unsubscribe_result = ws_client - .unsubscribe( - slot_subscription_handle.kind, - slot_subscription_handle.subscription_id, - tick_count.saturating_add(10), - ) - .await; - match unsubscribe_result { - Ok(value) => { - tracing::info!( - listener_session_id = session.id, - unsubscribed = value, - subscription_id = slot_subscription_handle.subscription_id, - "slot websocket subscription cancelled" - ); - }, - Err(error) => { - tracing::error!( - listener_session_id = session.id, - error = %error, - subscription_id = slot_subscription_handle.subscription_id, - "failed to cancel slot websocket subscription" - ); - }, + for subscription_handle in &ws_subscription_handles { + let unsubscribe_result = ws_client + .unsubscribe( + subscription_handle.kind, + subscription_handle.subscription_id, + tick_count.saturating_add(subscription_handle.request_id), + ) + .await; + match unsubscribe_result { + Ok(value) => { + tracing::info!( + listener_session_id = session.id, + unsubscribed = value, + subscription_id = subscription_handle.subscription_id, + kind = ?subscription_handle.kind, + "websocket subscription cancelled" + ); + }, + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + subscription_id = subscription_handle.subscription_id, + kind = ?subscription_handle.kind, + "failed to cancel websocket subscription" + ); + }, + } } let ws_close_result = ws_client.close().await; match ws_close_result { diff --git a/khbb_lib/src/solana_rpc_ws.rs b/khbb_lib/src/solana_rpc_ws.rs index c0559c6..a72278c 100644 --- a/khbb_lib/src/solana_rpc_ws.rs +++ b/khbb_lib/src/solana_rpc_ws.rs @@ -1,4 +1,5 @@ // file: khbb_lib/src/solana_rpc_ws.rs + //! Minimal Solana WebSocket JSON-RPC client. //! //! This module keeps full control over the WebSocket transport and JSON-RPC @@ -412,51 +413,71 @@ impl KhbbSolanaWsRpcClient { return Err(error); }, } - let response_body_result = self.read_next_text_message().await; - let response_body_option = match response_body_result { - Ok(value) => value, - Err(error) => { - return Err(error); - }, - }; - let response_body = match response_body_option { - Some(value) => value, - None => { - return Err(crate::KhbbError::Runtime { - context: "read websocket unsubscribe response", - message: std::string::String::from("websocket stream ended before response"), - }); - }, - }; - let parse_result = parse_json_rpc_response::(&response_body); - let parsed_response = match parse_result { - Ok(value) => value, - Err(error) => { - return Err(error); - }, - }; - if let Some(error_value) = parsed_response.error { - let error_text_result = serde_json::to_string(&error_value); - let error_text = match error_text_result { + loop { + let response_body_result = self.read_next_text_message().await; + let response_body_option = match response_body_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let response_body = match response_body_option { + Some(value) => value, + None => { + return Err(crate::KhbbError::Runtime { + context: "read websocket unsubscribe response", + message: std::string::String::from( + "websocket stream ended before unsubscribe response", + ), + }); + }, + }; + let json_value_result = serde_json::from_str::(&response_body); + let json_value = match json_value_result { Ok(value) => value, Err(error) => { return Err(crate::KhbbError::Json { - context: "serialize websocket unsubscribe rpc error", + context: "decode websocket unsubscribe response as json value", message: error.to_string(), }); - } + }, }; - return Err(crate::KhbbError::Runtime { - context: "websocket unsubscribe returned rpc error", - message: error_text, - }); - } - match parsed_response.result { - Some(value) => Ok(value), - None => Err(crate::KhbbError::Runtime { - context: "websocket unsubscribe returned empty result", - message: response_body, - }), + let id_value_option = json_value.get("id"); + if id_value_option.is_none() { + continue; + } + let parsed_response_result = parse_json_rpc_response::(&response_body); + let parsed_response = match parsed_response_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + if let Some(error_value) = parsed_response.error { + let error_text_result = serde_json::to_string(&error_value); + let error_text = match error_text_result { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Json { + context: "serialize websocket unsubscribe rpc error", + message: error.to_string(), + }); + }, + }; + return Err(crate::KhbbError::Runtime { + context: "websocket unsubscribe returned rpc error", + message: error_text, + }); + } + match parsed_response.result { + Some(value) => return Ok(value), + None => { + return Err(crate::KhbbError::Runtime { + context: "websocket unsubscribe returned empty result", + message: response_body, + }); + }, + } } } @@ -477,36 +498,54 @@ impl KhbbSolanaWsRpcClient { return Err(error); }, }; - let response_body_result = self.read_next_text_message().await; - let response_body_option = match response_body_result { - Ok(value) => value, - Err(error) => { - return Err(error); - }, - }; - let response_body = match response_body_option { - Some(value) => value, - None => { - return Err(crate::KhbbError::Runtime { - context: "read websocket subscribe response", - message: std::string::String::from("websocket stream ended before response"), - }); - }, - }; - let subscription_id_result = parse_subscription_id_response(&response_body); - let subscription_id = match subscription_id_result { - Ok(value) => value, - Err(error) => { - return Err(error); - }, - }; - Ok(KhbbWsSubscribeCallOutput { - request_id: id, - method: std::string::String::from(kind.subscribe_method_name()), - subscription_id, - request_body, - response_body, - }) + loop { + let response_body_result = self.read_next_text_message().await; + let response_body_option = match response_body_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + let response_body = match response_body_option { + Some(value) => value, + None => { + return Err(crate::KhbbError::Runtime { + context: "read websocket subscribe response", + message: std::string::String::from( + "websocket stream ended before subscribe response", + ), + }); + }, + }; + let json_value_result = serde_json::from_str::(&response_body); + let json_value = match json_value_result { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Json { + context: "decode websocket subscribe response as json value", + message: error.to_string(), + }); + }, + }; + let id_value_option = json_value.get("id"); + if id_value_option.is_none() { + continue; + } + let subscription_id_result = parse_subscription_id_response(&response_body); + let subscription_id = match subscription_id_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + return Ok(KhbbWsSubscribeCallOutput { + request_id: id, + method: std::string::String::from(kind.subscribe_method_name()), + subscription_id, + request_body, + response_body, + }); + } } } @@ -519,7 +558,6 @@ where { let parse_result = serde_json::from_str::>(response_body); - match parse_result { Ok(value) => Ok(value), Err(error) => Err(crate::KhbbError::Json { @@ -618,7 +656,7 @@ pub(crate) fn parse_subscription_id_response( context: "serialize websocket subscribe rpc error", message: error.to_string(), }); - } + }, }; return Err(crate::KhbbError::Runtime { context: "websocket subscribe returned rpc error", diff --git a/khbb_lib/src/storage.rs b/khbb_lib/src/storage.rs index ff1e480..c729720 100644 --- a/khbb_lib/src/storage.rs +++ b/khbb_lib/src/storage.rs @@ -374,6 +374,10 @@ mod tests { log_filter: std::string::String::from("info"), bootstrap_database: true, listener_poll_interval_ms: 1000, + enable_ws_slot_subscribe: true, + enable_ws_logs_subscribe: true, + enable_ws_program_subscribe: false, + ws_program_subscribe_program_ids: vec![], } } @@ -497,6 +501,10 @@ WHERE id = ?1; log_filter: "info".into(), bootstrap_database: false, listener_poll_interval_ms: 1000, + enable_ws_slot_subscribe: true, + enable_ws_logs_subscribe: true, + enable_ws_program_subscribe: false, + ws_program_subscribe_program_ids: vec![], }, ) .await diff --git a/khbb_listener_app/Cargo.toml b/khbb_listener_app/Cargo.toml index 269f3c5..2ebac11 100644 --- a/khbb_listener_app/Cargo.toml +++ b/khbb_listener_app/Cargo.toml @@ -10,5 +10,6 @@ publish.workspace = true [dependencies] khbb_lib = { path = "../khbb_lib" } +rustls.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/khbb_listener_app/src/main.rs b/khbb_listener_app/src/main.rs index 12a0f18..ff797e9 100644 --- a/khbb_listener_app/src/main.rs +++ b/khbb_listener_app/src/main.rs @@ -10,6 +10,16 @@ /// Entrypoint of the khbb listener binary. #[tokio::main] async fn main() -> std::process::ExitCode { + if rustls::crypto::CryptoProvider::get_default().is_none() { + let provider_result = rustls::crypto::aws_lc_rs::default_provider().install_default(); + match provider_result { + Ok(()) => {}, + Err(error) => { + eprintln!("khbb_listener_app rustls provider init error: {:?}", error); + return std::process::ExitCode::FAILURE; + }, + } + } let args = std::env::args().collect::>(); let config_path = if args.len() >= 2 { args[1].as_str() } else { "config.json" }; let run_result = khbb_lib::run_listener_app(config_path).await;