From 23dab2df8562b9b326ec40f0db7316263d2d60f7 Mon Sep 17 00:00:00 2001 From: SinuS Von SifriduS Date: Wed, 22 Apr 2026 16:01:19 +0200 Subject: [PATCH] 0.4.3 --- CHANGELOG.md | 1 + Cargo.toml | 2 +- ROADMAP.md | 27 +- kb_lib/Cargo.toml | 1 + kb_lib/src/config.rs | 2 + kb_lib/src/http_client.rs | 151 +++++++++- kb_lib/src/http_pool.rs | 615 ++++++++++++++++++++++++++++++++++++++ kb_lib/src/lib.rs | 3 + 8 files changed, 779 insertions(+), 23 deletions(-) create mode 100644 kb_lib/src/http_pool.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index dc5afa0..2f7d61c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,3 +14,4 @@ 0.4.0 - Socle HttpClient générique async clonable, JSON-RPC HTTP 2.0, résolution d’URL avec api_key_env_var, limiteur local req/sec + burst, helpers initiaux getHealth/getVersion/getSlot 0.4.1 - Ajout des premiers helpers HTTP Solana haut niveau, dans la continuité de l’API du client WebSocket 0.4.2 - Préparation de la politique HTTP avancée : états de pause avant envoi, quotas par famille de méthodes et futur pool d’endpoints +0.4.3 - Pool d’endpoints HTTP \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 7011729..27fcf3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.4.2" +version = "0.4.3" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index 98a2874..edbb579 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -306,13 +306,24 @@ Livrables : - distinguer quota RPC général et quota `sendTransaction`, - préparer un futur pool d’endpoints HTTP et l’arbitrage entre eux. -### 0.4.3 — Démo HTTP dans `kb_app` +### 0.4.3 — Pool d’endpoints HTTP +À faire : + +- ajouter un pool d’`HttpClient`, +- sélectionner un endpoint selon le rôle demandé, +- ignorer les endpoints `Paused` ou `Disabled`, +- préparer une rotation simple entre endpoints actifs, +- prendre en compte la classe de méthode HTTP, +- préparer le routage multi-RPC et la limitation de concurrence par endpoint. + +### 0.4.4 — Démo HTTP dans `kb_app` À faire : - ajouter une fenêtre `Demo Http`, - suivre la logique de `Demo Ws`, - permettre de tester les endpoints HTTP configurés, -- afficher les réponses JSON-RPC HTTP et les erreurs associées. +- afficher les réponses JSON-RPC HTTP et les erreurs associées, +- exposer l’état du pool HTTP et les statuts des endpoints sélectionnables. ### 6.12. Version `0.5.x` — Base de données SQLite @@ -485,9 +496,9 @@ Le projet doit maintenir au minimum : La priorité immédiate est désormais la suivante : -1. démarrer la version `0.4.1` avec les helpers HTTP Solana, -2. conserver `HttpClient` comme transport HTTP générique réutilisable, -3. distinguer clairement les helpers raw et typed quand cela est pertinent, -4. préparer la future gestion avancée des quotas HTTP et des états de pause avant envoi, -5. préparer l’introduction d’un pool d’endpoints HTTP, -6. ajouter ensuite une fenêtre `Demo Http` dans `kb_app` sur le modèle de `Demo Ws`. +1. finaliser la version `0.4.3` avec le pool d’endpoints HTTP, +2. exploiter les statuts `Active` / `Paused` / `Disabled` dans la sélection d’endpoint, +3. préparer le routage multi-RPC selon le rôle demandé et la classe de méthode, +4. conserver `HttpClient` comme brique générique réutilisable sous le pool, +5. démarrer ensuite la version `0.4.4` avec une fenêtre `Demo Http` dans `kb_app`, +6. exposer dans `kb_app` les réponses HTTP, les erreurs et l’état du pool. diff --git a/kb_lib/Cargo.toml b/kb_lib/Cargo.toml index 8015f1e..55c8b2b 100644 --- a/kb_lib/Cargo.toml +++ b/kb_lib/Cargo.toml @@ -9,6 +9,7 @@ authors.workspace = true publish.workspace = true [dependencies] +chrono.workspace = true futures-util.workspace = true reqwest.workspace = true serde.workspace = true diff --git a/kb_lib/src/config.rs b/kb_lib/src/config.rs index cda6937..b49b5d8 100644 --- a/kb_lib/src/config.rs +++ b/kb_lib/src/config.rs @@ -418,6 +418,8 @@ pub struct KbHttpEndpointConfig { pub max_idle_connections_per_host: usize, /// Automatic pause duration after an HTTP 429 response, in milliseconds. pub pause_after_http_429_ms: std::option::Option, + /// Maximum number of concurrent in-flight HTTP requests for this endpoint. + pub max_concurrent_requests_per_endpoint: usize, } impl KbHttpEndpointConfig { diff --git a/kb_lib/src/http_client.rs b/kb_lib/src/http_client.rs index 1d448ab..ecb8eae 100644 --- a/kb_lib/src/http_client.rs +++ b/kb_lib/src/http_client.rs @@ -134,7 +134,7 @@ impl KbHttpTokenBucket { } } -#[derive(Debug)] +#[derive(Clone, Debug)] enum KbHttpEndpointLifecycleState { Active, PausedUntil(std::time::Instant), @@ -174,6 +174,7 @@ pub struct HttpClient { client: reqwest::Client, next_request_id: std::sync::Arc, runtime: std::sync::Arc>, + concurrency_limiter: std::sync::Arc, } impl HttpClient { @@ -236,6 +237,12 @@ impl HttpClient { endpoint.name ))); } + if endpoint.max_concurrent_requests_per_endpoint == 0 { + return Err(crate::KbError::Config(format!( + "http endpoint '{}' must have max_concurrent_requests_per_endpoint > 0", + endpoint.name + ))); + } let resolved_url_result = endpoint.resolved_url(); let resolved_url = match resolved_url_result { Ok(resolved_url) => resolved_url, @@ -272,6 +279,9 @@ impl HttpClient { runtime: std::sync::Arc::new(tokio::sync::Mutex::new(KbHttpRuntimeState::new( &endpoint, ))), + concurrency_limiter: std::sync::Arc::new(tokio::sync::Semaphore::new( + endpoint.max_concurrent_requests_per_endpoint, + )), }) } @@ -290,6 +300,19 @@ impl HttpClient { &self.endpoint } + /// Returns whether this endpoint supports the requested logical role. + pub fn supports_role(&self, required_role: &str) -> bool { + if required_role.trim().is_empty() { + return true; + } + self.endpoint.roles.iter().any(|role| role == required_role) + } + + /// Returns the currently available concurrency slots for this endpoint. + pub fn available_concurrency_slots(&self) -> usize { + self.concurrency_limiter.available_permits() + } + /// Returns the next request identifier and increments the internal counter. pub fn next_request_id(&self) -> u64 { self.next_request_id @@ -300,12 +323,13 @@ impl HttpClient { pub async fn endpoint_status(&self) -> KbHttpEndpointStatus { let mut runtime_guard = self.runtime.lock().await; kb_http_normalize_runtime_lifecycle(&mut runtime_guard); - match &runtime_guard.lifecycle { + + match runtime_guard.lifecycle.clone() { KbHttpEndpointLifecycleState::Active => KbHttpEndpointStatus::Active, KbHttpEndpointLifecycleState::Disabled => KbHttpEndpointStatus::Disabled, KbHttpEndpointLifecycleState::PausedUntil(deadline) => { let now = std::time::Instant::now(); - if *deadline <= now { + if deadline <= now { runtime_guard.lifecycle = KbHttpEndpointLifecycleState::Active; KbHttpEndpointStatus::Active } else { @@ -386,6 +410,16 @@ impl HttpClient { Ok(body) => body, Err(error) => return Err(error), }; + let concurrency_permit_result = self.concurrency_limiter.clone().acquire_owned().await; + let _concurrency_permit = match concurrency_permit_result { + Ok(concurrency_permit) => concurrency_permit, + Err(error) => { + return Err(crate::KbError::Http(format!( + "cannot acquire concurrency slot for endpoint '{}' method '{}': {}", + self.endpoint.name, request.method, error + ))); + } + }; tracing::debug!( endpoint_name = %self.endpoint.name, endpoint_url = %self.resolved_url, @@ -411,6 +445,10 @@ impl HttpClient { } }; let status = response.status(); + let retry_after_header = response + .headers() + .get(reqwest::header::RETRY_AFTER) + .cloned(); let text_result = response.text().await; let text = match text_result { Ok(text) => text, @@ -422,7 +460,13 @@ impl HttpClient { } }; if status.as_u16() == 429 { - let pause_duration_ms = self.endpoint.pause_after_http_429_ms.unwrap_or(1500); + let pause_duration_ms = match retry_after_header + .as_ref() + .and_then(kb_http_retry_after_to_pause_ms) + { + Some(retry_after_ms) => retry_after_ms, + None => self.endpoint.pause_after_http_429_ms.unwrap_or(1500), + }; self.pause_for(pause_duration_ms).await; return Err(crate::KbError::Http(format!( "http status 429 returned by endpoint '{}' method '{}'; endpoint paused for {} ms; body='{}'", @@ -750,6 +794,38 @@ impl HttpClient { .await } + /// Raw helper for `sendTransaction`. + pub async fn send_transaction_raw( + &self, + encoded_transaction: std::string::String, + config: std::option::Option, + ) -> Result { + let params = kb_build_first_string_optional_config_params(encoded_transaction, config); + self.execute_json_rpc_result_raw("sendTransaction".to_string(), params) + .await + } + + /// Typed helper for `sendTransaction`. + pub async fn send_transaction( + &self, + encoded_transaction: std::string::String, + config: std::option::Option, + ) -> Result { + let config_value_result = + kb_serialize_optional_json_value(config, "sendTransaction config"); + let config_value = match config_value_result { + Ok(config_value) => config_value, + Err(error) => return Err(error), + }; + let params = + kb_build_first_string_optional_config_params(encoded_transaction, config_value); + self.execute_json_rpc_request_typed::( + "sendTransaction".to_string(), + params, + ) + .await + } + async fn acquire_rate_limit_slot_for_method_class( &self, method_class: KbHttpMethodClass, @@ -758,7 +834,7 @@ impl HttpClient { let wait_duration_option = { let mut runtime_guard = self.runtime.lock().await; kb_http_normalize_runtime_lifecycle(&mut runtime_guard); - match runtime_guard.lifecycle { + match runtime_guard.lifecycle.clone() { KbHttpEndpointLifecycleState::Disabled => { return Err(crate::KbError::Http(format!( "http endpoint '{}' is disabled", @@ -961,13 +1037,10 @@ fn kb_http_effective_burst_capacity( } fn kb_http_normalize_runtime_lifecycle(runtime: &mut KbHttpRuntimeState) { - match runtime.lifecycle { - KbHttpEndpointLifecycleState::PausedUntil(deadline) => { - if deadline <= std::time::Instant::now() { - runtime.lifecycle = KbHttpEndpointLifecycleState::Active; - } + if let KbHttpEndpointLifecycleState::PausedUntil(deadline) = runtime.lifecycle.clone() { + if deadline <= std::time::Instant::now() { + runtime.lifecycle = KbHttpEndpointLifecycleState::Active; } - _ => {} } } @@ -1007,6 +1080,35 @@ fn kb_http_consume_rate_limit_token( Some(std::time::Duration::from_secs_f64(wait_seconds.max(0.001))) } +fn kb_http_retry_after_to_pause_ms( + header_value: &reqwest::header::HeaderValue, +) -> std::option::Option { + let header_text_result = header_value.to_str(); + let header_text = match header_text_result { + Ok(header_text) => header_text.trim(), + Err(_) => { + return None; + } + }; + let seconds_result = header_text.parse::(); + if let Ok(seconds) = seconds_result { + return Some(seconds.saturating_mul(1000)); + } + let parsed_date_result = chrono::DateTime::parse_from_rfc2822(header_text); + let parsed_date = match parsed_date_result { + Ok(parsed_date) => parsed_date.with_timezone(&chrono::Utc), + Err(_) => { + return None; + } + }; + let now = chrono::Utc::now(); + let delta_ms = parsed_date.signed_duration_since(now).num_milliseconds(); + if delta_ms <= 0 { + return Some(0); + } + Some(delta_ms as u64) +} + fn kb_http_shorten_text(input: &str, max_chars: usize) -> std::string::String { let char_count = input.chars().count(); if char_count <= max_chars { @@ -1082,7 +1184,7 @@ mod tests { "id": id }).to_string(); let response_text = format!( - "HTTP/1.1 429 TOO MANY REQUESTS\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + "HTTP/1.1 429 TOO MANY REQUESTS\r\nContent-Type: application/json\r\nRetry-After: 1\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", response_body.len(), response_body ); @@ -1251,7 +1353,8 @@ mod tests { connect_timeout_ms: 2000, request_timeout_ms: 2000, max_idle_connections_per_host: 4, - pause_after_http_429_ms: Some(50), + pause_after_http_429_ms: Some(1500), + max_concurrent_requests_per_endpoint: 2, } } @@ -1391,6 +1494,11 @@ mod tests { .await .expect("get_transaction must succeed"); assert!(transaction.is_none()); + let sent_signature = client + .send_transaction("AAAA".to_string(), None) + .await + .expect("send_transaction must succeed"); + assert_eq!(sent_signature, "signature-test".to_string()); let observed_methods = server.observed_methods_snapshot().await; assert!(observed_methods.iter().any(|method| method == "getHealth")); assert!(observed_methods.iter().any(|method| method == "getVersion")); @@ -1416,6 +1524,11 @@ mod tests { .iter() .any(|method| method == "getTransaction") ); + assert!( + observed_methods + .iter() + .any(|method| method == "sendTransaction") + ); server.shutdown().await; } @@ -1424,6 +1537,7 @@ mod tests { let server = TestHttpServer::spawn().await; let endpoint = make_http_endpoint(server.url.clone()); let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); + let _ = client .get_health_raw() .await @@ -1467,6 +1581,10 @@ mod tests { ) .await .expect("get_transaction_raw must succeed"); + let _ = client + .send_transaction_raw("AAAA".to_string(), None) + .await + .expect("send_transaction_raw must succeed"); let observed_methods = server.observed_methods_snapshot().await; assert!(observed_methods.iter().any(|method| method == "getHealth")); assert!(observed_methods.iter().any(|method| method == "getVersion")); @@ -1502,6 +1620,11 @@ mod tests { .iter() .any(|method| method == "getTransaction") ); + assert!( + observed_methods + .iter() + .any(|method| method == "sendTransaction") + ); server.shutdown().await; } @@ -1523,7 +1646,7 @@ mod tests { panic!("unexpected status after 429: {other:?}"); } } - tokio::time::sleep(std::time::Duration::from_millis(70)).await; + tokio::time::sleep(std::time::Duration::from_millis(1100)).await; let resumed_status = client.endpoint_status().await; assert_eq!(resumed_status, crate::KbHttpEndpointStatus::Active); server.shutdown().await; diff --git a/kb_lib/src/http_pool.rs b/kb_lib/src/http_pool.rs new file mode 100644 index 0000000..3c404a5 --- /dev/null +++ b/kb_lib/src/http_pool.rs @@ -0,0 +1,615 @@ +// file: kb_lib/src/http_pool.rs + +//! HTTP endpoint pool and routing. +//! +//! This module provides a lightweight endpoint pool on top of `HttpClient`. +//! It is responsible for: +//! - filtering endpoints by role +//! - skipping paused or disabled endpoints +//! - simple round-robin selection among active endpoints +//! - preferring endpoints with available concurrency slots + +/// Snapshot of one pooled HTTP endpoint. +#[derive(Clone, Debug, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct KbHttpPoolClientSnapshot { + /// Logical endpoint name. + pub endpoint_name: std::string::String, + /// Provider name. + pub provider: std::string::String, + /// Resolved endpoint URL. + pub endpoint_url: std::string::String, + /// Supported roles. + pub roles: std::vec::Vec, + /// Current endpoint status string. + pub status: std::string::String, + /// Remaining pause duration in milliseconds when paused. + pub paused_remaining_ms: std::option::Option, + /// Available concurrency slots. + pub available_concurrency_slots: usize, +} + +/// Pool of HTTP endpoints. +#[derive(Clone, Debug)] +pub struct HttpEndpointPool { + clients: std::vec::Vec, + next_index: std::sync::Arc, +} + +impl HttpEndpointPool { + /// Creates a pool from already constructed clients. + pub fn new(clients: std::vec::Vec) -> Result { + if clients.is_empty() { + return Err(crate::KbError::Config( + "http endpoint pool requires at least one client".to_string(), + )); + } + Ok(Self { + clients, + next_index: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)), + }) + } + + /// Creates a pool from endpoint configurations. + pub fn from_endpoint_configs( + endpoint_configs: std::vec::Vec, + ) -> Result { + let mut clients = std::vec::Vec::new(); + for endpoint in endpoint_configs { + if !endpoint.enabled { + continue; + } + let client_result = crate::HttpClient::new(endpoint); + let client = match client_result { + Ok(client) => client, + Err(error) => return Err(error), + }; + clients.push(client); + } + Self::new(clients) + } + + /// Creates a pool from the global configuration. + pub fn from_config(config: &crate::KbConfig) -> Result { + Self::from_endpoint_configs(config.solana.http_endpoints.clone()) + } + + /// Returns the number of pooled clients. + pub fn len(&self) -> usize { + self.clients.len() + } + + /// Returns whether the pool is empty. + pub fn is_empty(&self) -> bool { + self.clients.is_empty() + } + + /// Returns a live snapshot of pooled endpoints. + pub async fn snapshot(&self) -> std::vec::Vec { + let mut snapshots = std::vec::Vec::new(); + for client in &self.clients { + let status = client.endpoint_status().await; + let (status_text, paused_remaining_ms) = match status { + crate::KbHttpEndpointStatus::Active => ("Active".to_string(), None), + crate::KbHttpEndpointStatus::Disabled => ("Disabled".to_string(), None), + crate::KbHttpEndpointStatus::Paused { remaining_ms } => { + ("Paused".to_string(), Some(remaining_ms)) + } + }; + snapshots.push(KbHttpPoolClientSnapshot { + endpoint_name: client.endpoint_name().to_string(), + provider: client.endpoint_config().provider.clone(), + endpoint_url: client.endpoint_url().to_string(), + roles: client.endpoint_config().roles.clone(), + status: status_text, + paused_remaining_ms, + available_concurrency_slots: client.available_concurrency_slots(), + }); + } + snapshots + } + + /// Selects one client for a role and method. + pub async fn select_client_for_role_and_method( + &self, + required_role: &str, + method: &str, + ) -> Result { + let method_class = crate::HttpClient::classify_method(method); + let _ = method_class; + let mut active_indices = std::vec::Vec::new(); + let mut paused_count = 0usize; + let mut disabled_count = 0usize; + let mut role_mismatch_count = 0usize; + let client_count = self.clients.len(); + let mut index = 0usize; + while index < client_count { + let client = &self.clients[index]; + if !client.supports_role(required_role) { + role_mismatch_count += 1; + index += 1; + continue; + } + let status = client.endpoint_status().await; + match status { + crate::KbHttpEndpointStatus::Active => { + active_indices.push(index); + } + crate::KbHttpEndpointStatus::Paused { .. } => { + paused_count += 1; + } + crate::KbHttpEndpointStatus::Disabled => { + disabled_count += 1; + } + } + index += 1; + } + if active_indices.is_empty() { + return Err(crate::KbError::Http(format!( + "no active http endpoint available for role '{}' and method '{}': paused={}, disabled={}, role_mismatch={}", + required_role, method, paused_count, disabled_count, role_mismatch_count + ))); + } + let rotation_seed = self + .next_index + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let active_len = active_indices.len(); + let mut offset = 0usize; + while offset < active_len { + let active_index = active_indices[(rotation_seed + offset) % active_len]; + let client = &self.clients[active_index]; + if client.available_concurrency_slots() > 0 { + return Ok(client.clone()); + } + offset += 1; + } + let fallback_index = active_indices[rotation_seed % active_len]; + Ok(self.clients[fallback_index].clone()) + } + + /// Executes one raw JSON-RPC request through the pool. + pub async fn execute_json_rpc_result_raw_for_role( + &self, + required_role: &str, + method: std::string::String, + params: std::vec::Vec, + ) -> Result { + let client_result = self + .select_client_for_role_and_method(required_role, &method) + .await; + let client = match client_result { + Ok(client) => client, + Err(error) => return Err(error), + }; + client.execute_json_rpc_result_raw(method, params).await + } + + /// Executes one typed JSON-RPC request through the pool. + pub async fn execute_json_rpc_request_typed_for_role( + &self, + required_role: &str, + method: std::string::String, + params: std::vec::Vec, + ) -> Result + where + T: serde::de::DeserializeOwned, + { + let client_result = self + .select_client_for_role_and_method(required_role, &method) + .await; + let client = match client_result { + Ok(client) => client, + Err(error) => return Err(error), + }; + client + .execute_json_rpc_request_typed::(method, params) + .await + } + /// Executes `getHealth` through the pool. + pub async fn get_health_for_role( + &self, + required_role: &str, + ) -> Result { + self.execute_json_rpc_request_typed_for_role::( + required_role, + "getHealth".to_string(), + std::vec::Vec::new(), + ) + .await + } + + /// Executes `getSlot` through the pool. + pub async fn get_slot_for_role( + &self, + required_role: &str, + config: std::option::Option, + ) -> Result { + let config_value_result = + kb_pool_serialize_optional_json_value(config, "pool getSlot config"); + let config_value = match config_value_result { + Ok(config_value) => config_value, + Err(error) => return Err(error), + }; + let params = kb_pool_build_optional_config_only_params(config_value); + self.execute_json_rpc_request_typed_for_role::( + required_role, + "getSlot".to_string(), + params, + ) + .await + } + + /// Executes `sendTransaction` through the pool. + pub async fn send_transaction_for_role( + &self, + required_role: &str, + encoded_transaction: std::string::String, + config: std::option::Option, + ) -> Result { + let config_value_result = + kb_pool_serialize_optional_json_value(config, "pool sendTransaction config"); + let config_value = match config_value_result { + Ok(config_value) => config_value, + Err(error) => return Err(error), + }; + let params = + kb_pool_build_first_string_optional_config_params(encoded_transaction, config_value); + self.execute_json_rpc_request_typed_for_role::( + required_role, + "sendTransaction".to_string(), + params, + ) + .await + } +} + +fn kb_pool_build_optional_config_only_params( + config: std::option::Option, +) -> std::vec::Vec { + let mut params = std::vec::Vec::new(); + + if let Some(config) = config { + params.push(config); + } + params +} + +fn kb_pool_build_first_string_optional_config_params( + first: std::string::String, + config: std::option::Option, +) -> std::vec::Vec { + let mut params = vec![serde_json::Value::String(first)]; + + if let Some(config) = config { + params.push(config); + } + params +} + +fn kb_pool_serialize_optional_json_value( + value: std::option::Option, + label: &str, +) -> Result, crate::KbError> +where + T: serde::Serialize, +{ + match value { + Some(value) => { + let value_result = serde_json::to_value(value); + match value_result { + Ok(value) => Ok(Some(value)), + Err(error) => Err(crate::KbError::Json(format!( + "cannot serialize {}: {error}", + label + ))), + } + } + None => Ok(None), + } +} + +#[cfg(test)] +mod tests { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + #[derive(Debug)] + struct TestHttpServer { + url: std::string::String, + shutdown_tx: std::option::Option>, + } + + impl TestHttpServer { + async fn spawn(server_name: std::string::String) -> Self { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("listener bind must succeed"); + let local_addr = listener.local_addr().expect("local addr must exist"); + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let server_name_for_task = server_name.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut shutdown_rx => { + break; + }, + accept_result = listener.accept() => { + let (mut stream, _peer_addr) = accept_result.expect("accept must succeed"); + let local_server_name = server_name_for_task.clone(); + tokio::spawn(async move { + let mut buffer = vec![0u8; 65536]; + let read_result = stream.read(&mut buffer).await; + let bytes_read = read_result.expect("read must succeed"); + let request_text = + std::string::String::from_utf8_lossy(&buffer[..bytes_read]).to_string(); + let split_result: std::vec::Vec<&str> = + request_text.split("\r\n\r\n").collect(); + let body = if split_result.len() >= 2 { + split_result[1].to_string() + } else { + std::string::String::new() + }; + let request_json: serde_json::Value = + serde_json::from_str(&body).expect("request body must be valid json"); + let method = request_json["method"] + .as_str() + .expect("method must be a string") + .to_string(); + let id = request_json["id"].clone(); + let response_body = if method == "getHealth" { + serde_json::json!({ + "jsonrpc": "2.0", + "result": format!("ok-{}", local_server_name), + "id": id + }).to_string() + } else if method == "getSlot" { + let slot_value = if local_server_name == "server_a" { + 111u64 + } else { + 222u64 + }; + serde_json::json!({ + "jsonrpc": "2.0", + "result": slot_value, + "id": id + }).to_string() + } else if method == "sendTransaction" { + serde_json::json!({ + "jsonrpc": "2.0", + "result": format!("sig-{}", local_server_name), + "id": id + }).to_string() + } else { + serde_json::json!({ + "jsonrpc": "2.0", + "error": { + "code": -32601, + "message": "Method not found" + }, + "id": id + }).to_string() + }; + let response_text = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + response_body.len(), + response_body + ); + let _ = stream.write_all(response_text.as_bytes()).await; + let _ = stream.shutdown().await; + }); + } + } + } + }); + Self { + url: format!("http://{}", local_addr), + shutdown_tx: Some(shutdown_tx), + } + } + async fn shutdown(mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } + } + } + + fn make_http_endpoint( + name: &str, + provider: &str, + url: std::string::String, + roles: std::vec::Vec, + ) -> crate::KbHttpEndpointConfig { + crate::KbHttpEndpointConfig { + name: name.to_string(), + enabled: true, + provider: provider.to_string(), + url, + api_key_env_var: None, + roles, + requests_per_second: 20, + burst_capacity: 5, + send_transaction_requests_per_second: Some(2), + send_transaction_burst_capacity: Some(1), + heavy_requests_per_second: Some(1), + heavy_burst_capacity: Some(1), + connect_timeout_ms: 2000, + request_timeout_ms: 2000, + max_idle_connections_per_host: 4, + max_concurrent_requests_per_endpoint: 2, + pause_after_http_429_ms: Some(1500), + } + } + + #[tokio::test] + async fn pool_selects_only_matching_active_clients() { + let endpoint_a = make_http_endpoint( + "endpoint_a", + "provider_a", + "http://127.0.0.1:65531".to_string(), + vec!["http_queries".to_string()], + ); + let endpoint_b = make_http_endpoint( + "endpoint_b", + "provider_b", + "http://127.0.0.1:65532".to_string(), + vec!["http_transactions".to_string()], + ); + let endpoint_c = make_http_endpoint( + "endpoint_c", + "provider_c", + "http://127.0.0.1:65533".to_string(), + vec!["http_queries".to_string()], + ); + let client_a = crate::HttpClient::new(endpoint_a).expect("client_a creation must succeed"); + let client_b = crate::HttpClient::new(endpoint_b).expect("client_b creation must succeed"); + let client_c = crate::HttpClient::new(endpoint_c).expect("client_c creation must succeed"); + client_c.pause_for(5000).await; + let pool = crate::HttpEndpointPool::new(vec![client_a.clone(), client_b, client_c]) + .expect("pool creation must succeed"); + let selected_client = pool + .select_client_for_role_and_method("http_queries", "getSlot") + .await + .expect("selection must succeed"); + assert_eq!(selected_client.endpoint_name(), "endpoint_a"); + } + + #[tokio::test] + async fn pool_round_robins_between_two_active_clients() { + let endpoint_a = make_http_endpoint( + "endpoint_a", + "provider_a", + "http://127.0.0.1:65534".to_string(), + vec!["http_queries".to_string()], + ); + let endpoint_b = make_http_endpoint( + "endpoint_b", + "provider_b", + "http://127.0.0.1:65535".to_string(), + vec!["http_queries".to_string()], + ); + let client_a = crate::HttpClient::new(endpoint_a).expect("client_a creation must succeed"); + let client_b = crate::HttpClient::new(endpoint_b).expect("client_b creation must succeed"); + let pool = crate::HttpEndpointPool::new(vec![client_a, client_b]) + .expect("pool creation must succeed"); + let first = pool + .select_client_for_role_and_method("http_queries", "getSlot") + .await + .expect("first selection must succeed"); + let second = pool + .select_client_for_role_and_method("http_queries", "getSlot") + .await + .expect("second selection must succeed"); + + assert_ne!(first.endpoint_name(), second.endpoint_name()); + } + + #[tokio::test] + async fn pool_snapshot_reports_statuses() { + let endpoint_a = make_http_endpoint( + "endpoint_a", + "provider_a", + "http://127.0.0.1:65001".to_string(), + vec!["http_queries".to_string()], + ); + let endpoint_b = make_http_endpoint( + "endpoint_b", + "provider_b", + "http://127.0.0.1:65002".to_string(), + vec!["http_queries".to_string()], + ); + let client_a = crate::HttpClient::new(endpoint_a).expect("client_a creation must succeed"); + let client_b = crate::HttpClient::new(endpoint_b).expect("client_b creation must succeed"); + client_b.disable().await; + let pool = crate::HttpEndpointPool::new(vec![client_a, client_b]) + .expect("pool creation must succeed"); + let snapshots = pool.snapshot().await; + assert_eq!(snapshots.len(), 2); + assert!(snapshots.iter().any(|snapshot| snapshot.status == "Active")); + assert!( + snapshots + .iter() + .any(|snapshot| snapshot.status == "Disabled") + ); + } + + #[tokio::test] + async fn pool_executes_get_health_for_role() { + let server_a = TestHttpServer::spawn("server_a".to_string()).await; + let server_b = TestHttpServer::spawn("server_b".to_string()).await; + let endpoint_a = make_http_endpoint( + "endpoint_a", + "provider_a", + server_a.url.clone(), + vec!["http_queries".to_string()], + ); + let endpoint_b = make_http_endpoint( + "endpoint_b", + "provider_b", + server_b.url.clone(), + vec!["http_transactions".to_string()], + ); + let pool = crate::HttpEndpointPool::from_endpoint_configs(vec![endpoint_a, endpoint_b]) + .expect("pool creation must succeed"); + let health = pool + .get_health_for_role("http_queries") + .await + .expect("pool get_health_for_role must succeed"); + assert_eq!(health, "ok-server_a".to_string()); + server_a.shutdown().await; + server_b.shutdown().await; + } + + #[tokio::test] + async fn pool_executes_send_transaction_for_role() { + let server_a = TestHttpServer::spawn("server_a".to_string()).await; + let server_b = TestHttpServer::spawn("server_b".to_string()).await; + let endpoint_a = make_http_endpoint( + "endpoint_a", + "provider_a", + server_a.url.clone(), + vec!["http_queries".to_string()], + ); + let endpoint_b = make_http_endpoint( + "endpoint_b", + "provider_b", + server_b.url.clone(), + vec!["http_transactions".to_string()], + ); + let pool = crate::HttpEndpointPool::from_endpoint_configs(vec![endpoint_a, endpoint_b]) + .expect("pool creation must succeed"); + let signature = pool + .send_transaction_for_role("http_transactions", "AAAA".to_string(), None) + .await + .expect("pool send_transaction_for_role must succeed"); + assert_eq!(signature, "sig-server_b".to_string()); + server_a.shutdown().await; + server_b.shutdown().await; + } + + #[tokio::test] + async fn pool_returns_error_when_no_active_client_matches_role() { + let endpoint_a = make_http_endpoint( + "endpoint_a", + "provider_a", + "http://127.0.0.1:65101".to_string(), + vec!["http_queries".to_string()], + ); + let client_a = crate::HttpClient::new(endpoint_a).expect("client creation must succeed"); + client_a.disable().await; + let pool = + crate::HttpEndpointPool::new(vec![client_a]).expect("pool creation must succeed"); + let result = pool + .select_client_for_role_and_method("http_queries", "getSlot") + .await; + assert!(result.is_err()); + let error = result.expect_err("selection must fail"); + match error { + crate::KbError::Http(message) => { + assert!(message.contains("no active http endpoint available")); + } + other => { + panic!("unexpected error: {other:?}"); + } + } + } +} diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index a8d37ea..385d241 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -17,6 +17,7 @@ mod tracing; mod types; mod ws_client; mod rpc_ws_solana; +mod http_pool; pub use crate::config::KbAppConfig; pub use crate::config::KbConfig; @@ -57,4 +58,6 @@ pub use crate::ws_client::WsSubscriptionInfo; pub use crate::rpc_ws_solana::KbSolanaWsTypedNotification; pub use crate::rpc_ws_solana::parse_kb_solana_ws_typed_notification; pub use crate::rpc_ws_solana::parse_kb_solana_ws_typed_notification_from_event; +pub use crate::http_pool::HttpEndpointPool; +pub use crate::http_pool::KbHttpPoolClientSnapshot;