// file: kb_lib/src/http_pool.rs //! HTTP endpoint pool and routing. //! //! This module provides a lightweight endpoint pool on top of `HttpClient`. //! It is responsible for: //! - filtering endpoints by role //! - skipping paused or disabled endpoints //! - simple round-robin selection among active endpoints //! - preferring endpoints with available concurrency slots /// Snapshot of one pooled HTTP endpoint. #[derive(Clone, Debug, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct KbHttpPoolClientSnapshot { /// Logical endpoint name. pub endpoint_name: std::string::String, /// Provider name. pub provider: std::string::String, /// Resolved endpoint URL. pub endpoint_url: std::string::String, /// Supported roles. pub roles: std::vec::Vec, /// Current endpoint status string. pub status: std::string::String, /// Remaining pause duration in milliseconds when paused. pub paused_remaining_ms: std::option::Option, /// Available concurrency slots. pub available_concurrency_slots: usize, } /// Pool of HTTP endpoints. #[derive(Clone, Debug)] pub struct HttpEndpointPool { clients: std::vec::Vec, next_index: std::sync::Arc, } impl HttpEndpointPool { /// Creates a pool from already constructed clients. pub fn new(clients: std::vec::Vec) -> Result { if clients.is_empty() { return Err(crate::KbError::Config( "http endpoint pool requires at least one client".to_string(), )); } Ok(Self { clients, next_index: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)), }) } /// Creates a pool from endpoint configurations. pub fn from_endpoint_configs( endpoint_configs: std::vec::Vec, ) -> Result { let mut clients = std::vec::Vec::new(); for endpoint in endpoint_configs { if !endpoint.enabled { continue; } let client_result = crate::HttpClient::new(endpoint); let client = match client_result { Ok(client) => client, Err(error) => return Err(error), }; clients.push(client); } Self::new(clients) } /// Creates a pool from the global configuration. pub fn from_config(config: &crate::KbConfig) -> Result { Self::from_endpoint_configs(config.solana.http_endpoints.clone()) } /// Returns the number of pooled clients. pub fn len(&self) -> usize { self.clients.len() } /// Returns whether the pool is empty. pub fn is_empty(&self) -> bool { self.clients.is_empty() } /// Returns a live snapshot of pooled endpoints. pub async fn snapshot(&self) -> std::vec::Vec { let mut snapshots = std::vec::Vec::new(); for client in &self.clients { let status = client.endpoint_status().await; let (status_text, paused_remaining_ms) = match status { crate::KbHttpEndpointStatus::Active => ("Active".to_string(), None), crate::KbHttpEndpointStatus::Disabled => ("Disabled".to_string(), None), crate::KbHttpEndpointStatus::Paused { remaining_ms } => { ("Paused".to_string(), Some(remaining_ms)) } }; snapshots.push(KbHttpPoolClientSnapshot { endpoint_name: client.endpoint_name().to_string(), provider: client.endpoint_config().provider.clone(), endpoint_url: client.endpoint_url().to_string(), roles: client.endpoint_config().roles.clone(), status: status_text, paused_remaining_ms, available_concurrency_slots: client.available_concurrency_slots(), }); } snapshots } /// Selects one client for a role and method. pub async fn select_client_for_role_and_method( &self, required_role: &str, method: &str, ) -> Result { let method_class = crate::HttpClient::classify_method(method); let _ = method_class; let mut active_indices = std::vec::Vec::new(); let mut paused_count = 0usize; let mut disabled_count = 0usize; let mut role_mismatch_count = 0usize; let client_count = self.clients.len(); let mut index = 0usize; while index < client_count { let client = &self.clients[index]; if !client.supports_role(required_role) { role_mismatch_count += 1; index += 1; continue; } let status = client.endpoint_status().await; match status { crate::KbHttpEndpointStatus::Active => { active_indices.push(index); } crate::KbHttpEndpointStatus::Paused { .. } => { paused_count += 1; } crate::KbHttpEndpointStatus::Disabled => { disabled_count += 1; } } index += 1; } if active_indices.is_empty() { return Err(crate::KbError::Http(format!( "no active http endpoint available for role '{}' and method '{}': paused={}, disabled={}, role_mismatch={}", required_role, method, paused_count, disabled_count, role_mismatch_count ))); } let rotation_seed = self .next_index .fetch_add(1, std::sync::atomic::Ordering::Relaxed); let active_len = active_indices.len(); let mut offset = 0usize; while offset < active_len { let active_index = active_indices[(rotation_seed + offset) % active_len]; let client = &self.clients[active_index]; if client.available_concurrency_slots() > 0 { return Ok(client.clone()); } offset += 1; } let fallback_index = active_indices[rotation_seed % active_len]; Ok(self.clients[fallback_index].clone()) } /// Executes one raw JSON-RPC request through the pool. pub async fn execute_json_rpc_result_raw_for_role( &self, required_role: &str, method: std::string::String, params: std::vec::Vec, ) -> Result { let client_result = self .select_client_for_role_and_method(required_role, &method) .await; let client = match client_result { Ok(client) => client, Err(error) => return Err(error), }; client.execute_json_rpc_result_raw(method, params).await } /// Executes one typed JSON-RPC request through the pool. pub async fn execute_json_rpc_request_typed_for_role( &self, required_role: &str, method: std::string::String, params: std::vec::Vec, ) -> Result where T: serde::de::DeserializeOwned, { let client_result = self .select_client_for_role_and_method(required_role, &method) .await; let client = match client_result { Ok(client) => client, Err(error) => return Err(error), }; client .execute_json_rpc_request_typed::(method, params) .await } /// Executes `getHealth` through the pool. pub async fn get_health_for_role( &self, required_role: &str, ) -> Result { self.execute_json_rpc_request_typed_for_role::( required_role, "getHealth".to_string(), std::vec::Vec::new(), ) .await } /// Executes `getSlot` through the pool. pub async fn get_slot_for_role( &self, required_role: &str, config: std::option::Option, ) -> Result { let config_value_result = kb_pool_serialize_optional_json_value(config, "pool getSlot config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; let params = kb_pool_build_optional_config_only_params(config_value); self.execute_json_rpc_request_typed_for_role::( required_role, "getSlot".to_string(), params, ) .await } /// Executes `sendTransaction` through the pool. pub async fn send_transaction_for_role( &self, required_role: &str, encoded_transaction: std::string::String, config: std::option::Option, ) -> Result { let config_value_result = kb_pool_serialize_optional_json_value(config, "pool sendTransaction config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; let params = kb_pool_build_first_string_optional_config_params(encoded_transaction, config_value); self.execute_json_rpc_request_typed_for_role::( required_role, "sendTransaction".to_string(), params, ) .await } } fn kb_pool_build_optional_config_only_params( config: std::option::Option, ) -> std::vec::Vec { let mut params = std::vec::Vec::new(); if let Some(config) = config { params.push(config); } params } fn kb_pool_build_first_string_optional_config_params( first: std::string::String, config: std::option::Option, ) -> std::vec::Vec { let mut params = vec![serde_json::Value::String(first)]; if let Some(config) = config { params.push(config); } params } fn kb_pool_serialize_optional_json_value( value: std::option::Option, label: &str, ) -> Result, crate::KbError> where T: serde::Serialize, { match value { Some(value) => { let value_result = serde_json::to_value(value); match value_result { Ok(value) => Ok(Some(value)), Err(error) => Err(crate::KbError::Json(format!( "cannot serialize {}: {error}", label ))), } } None => Ok(None), } } #[cfg(test)] mod tests { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; #[derive(Debug)] struct TestHttpServer { url: std::string::String, shutdown_tx: std::option::Option>, } impl TestHttpServer { async fn spawn(server_name: std::string::String) -> Self { let listener = tokio::net::TcpListener::bind("127.0.0.1:0") .await .expect("listener bind must succeed"); let local_addr = listener.local_addr().expect("local addr must exist"); let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); let server_name_for_task = server_name.clone(); tokio::spawn(async move { loop { tokio::select! { _ = &mut shutdown_rx => { break; }, accept_result = listener.accept() => { let (mut stream, _peer_addr) = accept_result.expect("accept must succeed"); let local_server_name = server_name_for_task.clone(); tokio::spawn(async move { let mut buffer = vec![0u8; 65536]; let read_result = stream.read(&mut buffer).await; let bytes_read = read_result.expect("read must succeed"); let request_text = std::string::String::from_utf8_lossy(&buffer[..bytes_read]).to_string(); let split_result: std::vec::Vec<&str> = request_text.split("\r\n\r\n").collect(); let body = if split_result.len() >= 2 { split_result[1].to_string() } else { std::string::String::new() }; let request_json: serde_json::Value = serde_json::from_str(&body).expect("request body must be valid json"); let method = request_json["method"] .as_str() .expect("method must be a string") .to_string(); let id = request_json["id"].clone(); let response_body = if method == "getHealth" { serde_json::json!({ "jsonrpc": "2.0", "result": format!("ok-{}", local_server_name), "id": id }).to_string() } else if method == "getSlot" { let slot_value = if local_server_name == "server_a" { 111u64 } else { 222u64 }; serde_json::json!({ "jsonrpc": "2.0", "result": slot_value, "id": id }).to_string() } else if method == "sendTransaction" { serde_json::json!({ "jsonrpc": "2.0", "result": format!("sig-{}", local_server_name), "id": id }).to_string() } else { serde_json::json!({ "jsonrpc": "2.0", "error": { "code": -32601, "message": "Method not found" }, "id": id }).to_string() }; let response_text = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", response_body.len(), response_body ); let _ = stream.write_all(response_text.as_bytes()).await; let _ = stream.shutdown().await; }); } } } }); Self { url: format!("http://{}", local_addr), shutdown_tx: Some(shutdown_tx), } } async fn shutdown(mut self) { if let Some(shutdown_tx) = self.shutdown_tx.take() { let _ = shutdown_tx.send(()); } } } fn make_http_endpoint( name: &str, provider: &str, url: std::string::String, roles: std::vec::Vec, ) -> crate::KbHttpEndpointConfig { crate::KbHttpEndpointConfig { name: name.to_string(), enabled: true, provider: provider.to_string(), url, api_key_env_var: None, roles, requests_per_second: 20, burst_capacity: 5, send_transaction_requests_per_second: Some(2), send_transaction_burst_capacity: Some(1), heavy_requests_per_second: Some(1), heavy_burst_capacity: Some(1), connect_timeout_ms: 2000, request_timeout_ms: 2000, max_idle_connections_per_host: 4, max_concurrent_requests_per_endpoint: 2, pause_after_http_429_ms: Some(1500), } } #[tokio::test] async fn pool_selects_only_matching_active_clients() { let endpoint_a = make_http_endpoint( "endpoint_a", "provider_a", "http://127.0.0.1:65531".to_string(), vec!["http_queries".to_string()], ); let endpoint_b = make_http_endpoint( "endpoint_b", "provider_b", "http://127.0.0.1:65532".to_string(), vec!["http_transactions".to_string()], ); let endpoint_c = make_http_endpoint( "endpoint_c", "provider_c", "http://127.0.0.1:65533".to_string(), vec!["http_queries".to_string()], ); let client_a = crate::HttpClient::new(endpoint_a).expect("client_a creation must succeed"); let client_b = crate::HttpClient::new(endpoint_b).expect("client_b creation must succeed"); let client_c = crate::HttpClient::new(endpoint_c).expect("client_c creation must succeed"); client_c.pause_for(5000).await; let pool = crate::HttpEndpointPool::new(vec![client_a.clone(), client_b, client_c]) .expect("pool creation must succeed"); let selected_client = pool .select_client_for_role_and_method("http_queries", "getSlot") .await .expect("selection must succeed"); assert_eq!(selected_client.endpoint_name(), "endpoint_a"); } #[tokio::test] async fn pool_round_robins_between_two_active_clients() { let endpoint_a = make_http_endpoint( "endpoint_a", "provider_a", "http://127.0.0.1:65534".to_string(), vec!["http_queries".to_string()], ); let endpoint_b = make_http_endpoint( "endpoint_b", "provider_b", "http://127.0.0.1:65535".to_string(), vec!["http_queries".to_string()], ); let client_a = crate::HttpClient::new(endpoint_a).expect("client_a creation must succeed"); let client_b = crate::HttpClient::new(endpoint_b).expect("client_b creation must succeed"); let pool = crate::HttpEndpointPool::new(vec![client_a, client_b]) .expect("pool creation must succeed"); let first = pool .select_client_for_role_and_method("http_queries", "getSlot") .await .expect("first selection must succeed"); let second = pool .select_client_for_role_and_method("http_queries", "getSlot") .await .expect("second selection must succeed"); assert_ne!(first.endpoint_name(), second.endpoint_name()); } #[tokio::test] async fn pool_snapshot_reports_statuses() { let endpoint_a = make_http_endpoint( "endpoint_a", "provider_a", "http://127.0.0.1:65001".to_string(), vec!["http_queries".to_string()], ); let endpoint_b = make_http_endpoint( "endpoint_b", "provider_b", "http://127.0.0.1:65002".to_string(), vec!["http_queries".to_string()], ); let client_a = crate::HttpClient::new(endpoint_a).expect("client_a creation must succeed"); let client_b = crate::HttpClient::new(endpoint_b).expect("client_b creation must succeed"); client_b.disable().await; let pool = crate::HttpEndpointPool::new(vec![client_a, client_b]) .expect("pool creation must succeed"); let snapshots = pool.snapshot().await; assert_eq!(snapshots.len(), 2); assert!(snapshots.iter().any(|snapshot| snapshot.status == "Active")); assert!( snapshots .iter() .any(|snapshot| snapshot.status == "Disabled") ); } #[tokio::test] async fn pool_executes_get_health_for_role() { let server_a = TestHttpServer::spawn("server_a".to_string()).await; let server_b = TestHttpServer::spawn("server_b".to_string()).await; let endpoint_a = make_http_endpoint( "endpoint_a", "provider_a", server_a.url.clone(), vec!["http_queries".to_string()], ); let endpoint_b = make_http_endpoint( "endpoint_b", "provider_b", server_b.url.clone(), vec!["http_transactions".to_string()], ); let pool = crate::HttpEndpointPool::from_endpoint_configs(vec![endpoint_a, endpoint_b]) .expect("pool creation must succeed"); let health = pool .get_health_for_role("http_queries") .await .expect("pool get_health_for_role must succeed"); assert_eq!(health, "ok-server_a".to_string()); server_a.shutdown().await; server_b.shutdown().await; } #[tokio::test] async fn pool_executes_send_transaction_for_role() { let server_a = TestHttpServer::spawn("server_a".to_string()).await; let server_b = TestHttpServer::spawn("server_b".to_string()).await; let endpoint_a = make_http_endpoint( "endpoint_a", "provider_a", server_a.url.clone(), vec!["http_queries".to_string()], ); let endpoint_b = make_http_endpoint( "endpoint_b", "provider_b", server_b.url.clone(), vec!["http_transactions".to_string()], ); let pool = crate::HttpEndpointPool::from_endpoint_configs(vec![endpoint_a, endpoint_b]) .expect("pool creation must succeed"); let signature = pool .send_transaction_for_role("http_transactions", "AAAA".to_string(), None) .await .expect("pool send_transaction_for_role must succeed"); assert_eq!(signature, "sig-server_b".to_string()); server_a.shutdown().await; server_b.shutdown().await; } #[tokio::test] async fn pool_returns_error_when_no_active_client_matches_role() { let endpoint_a = make_http_endpoint( "endpoint_a", "provider_a", "http://127.0.0.1:65101".to_string(), vec!["http_queries".to_string()], ); let client_a = crate::HttpClient::new(endpoint_a).expect("client creation must succeed"); client_a.disable().await; let pool = crate::HttpEndpointPool::new(vec![client_a]).expect("pool creation must succeed"); let result = pool .select_client_for_role_and_method("http_queries", "getSlot") .await; assert!(result.is_err()); let error = result.expect_err("selection must fail"); match error { crate::KbError::Http(message) => { assert!(message.contains("no active http endpoint available")); } other => { panic!("unexpected error: {other:?}"); } } } }