From 073266a1040e6bd99bd38b10e7b0316737134fb0 Mon Sep 17 00:00:00 2001 From: SinuS Von SifriduS Date: Wed, 22 Apr 2026 10:28:52 +0200 Subject: [PATCH] 0.4.3 --- CHANGELOG.md | 1 + Cargo.toml | 2 +- kb_lib/src/config.rs | 14 +- kb_lib/src/http_client.rs | 440 +++++++++++++++++++++++++++++++++++--- kb_lib/src/lib.rs | 2 + 5 files changed, 428 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cf3c24..dc5afa0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,3 +13,4 @@ 0.3.5 - Stabilisation de Demo Ws, lecture correcte des endpoints activés depuis la config, limitation/throttling de l’affichage UI sous fort débit 0.4.0 - Socle HttpClient générique async clonable, JSON-RPC HTTP 2.0, résolution d’URL avec api_key_env_var, limiteur local req/sec + burst, helpers initiaux getHealth/getVersion/getSlot 0.4.1 - Ajout des premiers helpers HTTP Solana haut niveau, dans la continuité de l’API du client WebSocket +0.4.2 - Préparation de la politique HTTP avancée : états de pause avant envoi, quotas par famille de méthodes et futur pool d’endpoints diff --git a/Cargo.toml b/Cargo.toml index 0d455f8..7011729 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.4.1" +version = "0.4.2" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/kb_lib/src/config.rs b/kb_lib/src/config.rs index e84fffb..cda6937 100644 --- a/kb_lib/src/config.rs +++ b/kb_lib/src/config.rs @@ -398,16 +398,26 @@ pub struct KbHttpEndpointConfig { pub api_key_env_var: std::option::Option, /// Allowed roles for this endpoint. pub roles: std::vec::Vec, - /// Requests per second allowed by the local limiter. + /// Requests per second allowed by the local limiter for general RPC methods. pub requests_per_second: u32, - /// Maximum local burst capacity. + /// Maximum local burst capacity for general RPC methods. pub burst_capacity: u32, + /// Optional requests per second override for `sendTransaction`-class methods. + pub send_transaction_requests_per_second: std::option::Option, + /// Optional burst override for `sendTransaction`-class methods. + pub send_transaction_burst_capacity: std::option::Option, + /// Optional requests per second override for heavy read methods. + pub heavy_requests_per_second: std::option::Option, + /// Optional burst override for heavy read methods. + pub heavy_burst_capacity: std::option::Option, /// Connect timeout in milliseconds. pub connect_timeout_ms: u64, /// Total request timeout in milliseconds. pub request_timeout_ms: u64, /// Maximum idle pooled connections per host. pub max_idle_connections_per_host: usize, + /// Automatic pause duration after an HTTP 429 response, in milliseconds. + pub pause_after_http_429_ms: std::option::Option, } impl KbHttpEndpointConfig { diff --git a/kb_lib/src/http_client.rs b/kb_lib/src/http_client.rs index 9e5253e..1d448ab 100644 --- a/kb_lib/src/http_client.rs +++ b/kb_lib/src/http_client.rs @@ -5,9 +5,11 @@ //! This module provides a reusable `HttpClient` built on top of `reqwest` for //! Solana RPC HTTP endpoints. //! -//! Version `0.4.1` extends the `0.4.0` transport layer with: -//! - raw Solana HTTP helpers -//! - typed Solana HTTP helpers using `solana_rpc_client_api` types +//! Version `0.4.2` extends the `0.4.1` transport layer with: +//! - local endpoint status management (`Active`, `Paused`, `Disabled`) +//! - method classification (`GeneralRpc`, `SendTransaction`, `HeavyRead`) +//! - per-class local rate limiting +//! - automatic pause after HTTP 429 responses /// JSON-RPC 2.0 request envelope for HTTP. #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] @@ -92,6 +94,31 @@ pub enum KbJsonRpcHttpResponse { Error(KbJsonRpcHttpErrorResponse), } +/// Local HTTP method class used for independent limit buckets. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum KbHttpMethodClass { + /// Standard RPC reads and generic methods. + GeneralRpc, + /// Transaction submission methods. + SendTransaction, + /// Resource-intensive read methods. + HeavyRead, +} + +/// Local endpoint status. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum KbHttpEndpointStatus { + /// Endpoint is ready to accept requests. + Active, + /// Endpoint is temporarily paused. + Paused { + /// Remaining pause duration in milliseconds. + remaining_ms: u64, + }, + /// Endpoint is manually disabled. + Disabled, +} + #[derive(Debug)] struct KbHttpTokenBucket { tokens: f64, @@ -107,6 +134,38 @@ impl KbHttpTokenBucket { } } +#[derive(Debug)] +enum KbHttpEndpointLifecycleState { + Active, + PausedUntil(std::time::Instant), + Disabled, +} + +#[derive(Debug)] +struct KbHttpRuntimeState { + lifecycle: KbHttpEndpointLifecycleState, + general_bucket: KbHttpTokenBucket, + send_transaction_bucket: KbHttpTokenBucket, + heavy_read_bucket: KbHttpTokenBucket, +} + +impl KbHttpRuntimeState { + fn new(endpoint: &crate::KbHttpEndpointConfig) -> Self { + Self { + lifecycle: KbHttpEndpointLifecycleState::Active, + general_bucket: KbHttpTokenBucket::new(endpoint.burst_capacity), + send_transaction_bucket: KbHttpTokenBucket::new(kb_http_effective_burst_capacity( + endpoint, + KbHttpMethodClass::SendTransaction, + )), + heavy_read_bucket: KbHttpTokenBucket::new(kb_http_effective_burst_capacity( + endpoint, + KbHttpMethodClass::HeavyRead, + )), + } + } +} + /// Generic asynchronous HTTP client. #[derive(Clone, Debug)] pub struct HttpClient { @@ -114,7 +173,7 @@ pub struct HttpClient { resolved_url: std::string::String, client: reqwest::Client, next_request_id: std::sync::Arc, - limiter: std::sync::Arc>, + runtime: std::sync::Arc>, } impl HttpClient { @@ -137,6 +196,40 @@ impl HttpClient { endpoint.name ))); } + if let Some(send_transaction_requests_per_second) = + endpoint.send_transaction_requests_per_second + { + if send_transaction_requests_per_second == 0 { + return Err(crate::KbError::Config(format!( + "http endpoint '{}' must have send_transaction_requests_per_second > 0 when configured", + endpoint.name + ))); + } + } + if let Some(send_transaction_burst_capacity) = endpoint.send_transaction_burst_capacity { + if send_transaction_burst_capacity == 0 { + return Err(crate::KbError::Config(format!( + "http endpoint '{}' must have send_transaction_burst_capacity > 0 when configured", + endpoint.name + ))); + } + } + if let Some(heavy_requests_per_second) = endpoint.heavy_requests_per_second { + if heavy_requests_per_second == 0 { + return Err(crate::KbError::Config(format!( + "http endpoint '{}' must have heavy_requests_per_second > 0 when configured", + endpoint.name + ))); + } + } + if let Some(heavy_burst_capacity) = endpoint.heavy_burst_capacity { + if heavy_burst_capacity == 0 { + return Err(crate::KbError::Config(format!( + "http endpoint '{}' must have heavy_burst_capacity > 0 when configured", + endpoint.name + ))); + } + } if endpoint.max_idle_connections_per_host == 0 { return Err(crate::KbError::Config(format!( "http endpoint '{}' must have max_idle_connections_per_host > 0", @@ -172,13 +265,13 @@ impl HttpClient { } }; Ok(Self { - limiter: std::sync::Arc::new(tokio::sync::Mutex::new(KbHttpTokenBucket::new( - endpoint.burst_capacity, - ))), - endpoint, + endpoint: endpoint.clone(), resolved_url, client, next_request_id: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)), + runtime: std::sync::Arc::new(tokio::sync::Mutex::new(KbHttpRuntimeState::new( + &endpoint, + ))), }) } @@ -203,6 +296,68 @@ impl HttpClient { .fetch_add(1, std::sync::atomic::Ordering::Relaxed) } + /// Returns the current local endpoint status. + pub async fn endpoint_status(&self) -> KbHttpEndpointStatus { + let mut runtime_guard = self.runtime.lock().await; + kb_http_normalize_runtime_lifecycle(&mut runtime_guard); + match &runtime_guard.lifecycle { + KbHttpEndpointLifecycleState::Active => KbHttpEndpointStatus::Active, + KbHttpEndpointLifecycleState::Disabled => KbHttpEndpointStatus::Disabled, + KbHttpEndpointLifecycleState::PausedUntil(deadline) => { + let now = std::time::Instant::now(); + if *deadline <= now { + runtime_guard.lifecycle = KbHttpEndpointLifecycleState::Active; + KbHttpEndpointStatus::Active + } else { + let remaining = deadline.duration_since(now); + let remaining_ms_u128 = remaining.as_millis(); + let remaining_ms = if remaining_ms_u128 > u128::from(u64::MAX) { + u64::MAX + } else { + remaining_ms_u128 as u64 + }; + KbHttpEndpointStatus::Paused { remaining_ms } + } + } + } + } + + /// Pauses this endpoint locally before future sends. + pub async fn pause_for(&self, duration_ms: u64) { + let pause_duration = std::time::Duration::from_millis(duration_ms); + let pause_deadline = std::time::Instant::now() + pause_duration; + let mut runtime_guard = self.runtime.lock().await; + runtime_guard.lifecycle = KbHttpEndpointLifecycleState::PausedUntil(pause_deadline); + } + + /// Resumes this endpoint if it is paused. + pub async fn resume(&self) { + let mut runtime_guard = self.runtime.lock().await; + if matches!( + runtime_guard.lifecycle, + KbHttpEndpointLifecycleState::PausedUntil(_) + ) { + runtime_guard.lifecycle = KbHttpEndpointLifecycleState::Active; + } + } + + /// Disables this endpoint locally. + pub async fn disable(&self) { + let mut runtime_guard = self.runtime.lock().await; + runtime_guard.lifecycle = KbHttpEndpointLifecycleState::Disabled; + } + + /// Re-enables this endpoint locally. + pub async fn enable(&self) { + let mut runtime_guard = self.runtime.lock().await; + runtime_guard.lifecycle = KbHttpEndpointLifecycleState::Active; + } + + /// Classifies one HTTP JSON-RPC method. + pub fn classify_method(method: &str) -> KbHttpMethodClass { + kb_http_classify_method(method) + } + /// Executes one JSON-RPC request and returns the success envelope. pub async fn execute_json_rpc_request_raw( &self, @@ -219,7 +374,10 @@ impl HttpClient { &self, request: &KbJsonRpcHttpRequest, ) -> Result { - let rate_limit_result = self.acquire_rate_limit_slot().await; + let method_class = kb_http_classify_method(&request.method); + let rate_limit_result = self + .acquire_rate_limit_slot_for_method_class(method_class) + .await; if let Err(error) = rate_limit_result { return Err(error); } @@ -232,6 +390,7 @@ impl HttpClient { endpoint_name = %self.endpoint.name, endpoint_url = %self.resolved_url, method = %request.method, + method_class = ?method_class, request_id = %request.id, "sending http json-rpc request" ); @@ -262,6 +421,17 @@ impl HttpClient { ))); } }; + if status.as_u16() == 429 { + let pause_duration_ms = self.endpoint.pause_after_http_429_ms.unwrap_or(1500); + self.pause_for(pause_duration_ms).await; + return Err(crate::KbError::Http(format!( + "http status 429 returned by endpoint '{}' method '{}'; endpoint paused for {} ms; body='{}'", + self.endpoint.name, + request.method, + pause_duration_ms, + kb_http_shorten_text(&text, 512) + ))); + } if !status.is_success() { return Err(crate::KbError::Http(format!( "http status {} returned by endpoint '{}' method '{}' body='{}'", @@ -580,26 +750,39 @@ impl HttpClient { .await } - async fn acquire_rate_limit_slot(&self) -> Result<(), crate::KbError> { + async fn acquire_rate_limit_slot_for_method_class( + &self, + method_class: KbHttpMethodClass, + ) -> Result<(), crate::KbError> { loop { let wait_duration_option = { - let mut limiter_guard = self.limiter.lock().await; - let now = std::time::Instant::now(); - let elapsed_seconds = now - .duration_since(limiter_guard.last_refill_at) - .as_secs_f64(); - let replenished_tokens = limiter_guard.tokens - + elapsed_seconds * self.endpoint.requests_per_second as f64; - let burst_capacity = self.endpoint.burst_capacity as f64; - limiter_guard.tokens = replenished_tokens.min(burst_capacity); - limiter_guard.last_refill_at = now; - if limiter_guard.tokens >= 1.0 { - limiter_guard.tokens -= 1.0; - None - } else { - let missing_tokens = 1.0 - limiter_guard.tokens; - let wait_seconds = missing_tokens / self.endpoint.requests_per_second as f64; - Some(std::time::Duration::from_secs_f64(wait_seconds.max(0.001))) + let mut runtime_guard = self.runtime.lock().await; + kb_http_normalize_runtime_lifecycle(&mut runtime_guard); + match runtime_guard.lifecycle { + KbHttpEndpointLifecycleState::Disabled => { + return Err(crate::KbError::Http(format!( + "http endpoint '{}' is disabled", + self.endpoint.name + ))); + } + KbHttpEndpointLifecycleState::PausedUntil(deadline) => { + let now = std::time::Instant::now(); + if deadline > now { + Some(deadline.duration_since(now)) + } else { + runtime_guard.lifecycle = KbHttpEndpointLifecycleState::Active; + kb_http_consume_rate_limit_token( + &self.endpoint, + &mut runtime_guard, + method_class, + ) + } + } + KbHttpEndpointLifecycleState::Active => kb_http_consume_rate_limit_token( + &self.endpoint, + &mut runtime_guard, + method_class, + ), } }; match wait_duration_option { @@ -737,6 +920,93 @@ where } } +fn kb_http_classify_method(method: &str) -> KbHttpMethodClass { + if method == "sendTransaction" || method == "sendRawTransaction" { + return KbHttpMethodClass::SendTransaction; + } + if method == "getProgramAccounts" || method == "getLargestAccounts" { + return KbHttpMethodClass::HeavyRead; + } + KbHttpMethodClass::GeneralRpc +} + +fn kb_http_effective_requests_per_second( + endpoint: &crate::KbHttpEndpointConfig, + method_class: KbHttpMethodClass, +) -> u32 { + match method_class { + KbHttpMethodClass::GeneralRpc => endpoint.requests_per_second, + KbHttpMethodClass::SendTransaction => endpoint + .send_transaction_requests_per_second + .unwrap_or(endpoint.requests_per_second), + KbHttpMethodClass::HeavyRead => endpoint + .heavy_requests_per_second + .unwrap_or(endpoint.requests_per_second), + } +} + +fn kb_http_effective_burst_capacity( + endpoint: &crate::KbHttpEndpointConfig, + method_class: KbHttpMethodClass, +) -> u32 { + match method_class { + KbHttpMethodClass::GeneralRpc => endpoint.burst_capacity, + KbHttpMethodClass::SendTransaction => endpoint + .send_transaction_burst_capacity + .unwrap_or(endpoint.burst_capacity), + KbHttpMethodClass::HeavyRead => endpoint + .heavy_burst_capacity + .unwrap_or(endpoint.burst_capacity), + } +} + +fn kb_http_normalize_runtime_lifecycle(runtime: &mut KbHttpRuntimeState) { + match runtime.lifecycle { + KbHttpEndpointLifecycleState::PausedUntil(deadline) => { + if deadline <= std::time::Instant::now() { + runtime.lifecycle = KbHttpEndpointLifecycleState::Active; + } + } + _ => {} + } +} + +fn kb_http_consume_rate_limit_token( + endpoint: &crate::KbHttpEndpointConfig, + runtime: &mut KbHttpRuntimeState, + method_class: KbHttpMethodClass, +) -> std::option::Option { + let (bucket, requests_per_second, burst_capacity) = match method_class { + KbHttpMethodClass::GeneralRpc => ( + &mut runtime.general_bucket, + kb_http_effective_requests_per_second(endpoint, method_class), + kb_http_effective_burst_capacity(endpoint, method_class), + ), + KbHttpMethodClass::SendTransaction => ( + &mut runtime.send_transaction_bucket, + kb_http_effective_requests_per_second(endpoint, method_class), + kb_http_effective_burst_capacity(endpoint, method_class), + ), + KbHttpMethodClass::HeavyRead => ( + &mut runtime.heavy_read_bucket, + kb_http_effective_requests_per_second(endpoint, method_class), + kb_http_effective_burst_capacity(endpoint, method_class), + ), + }; + let now = std::time::Instant::now(); + let elapsed_seconds = now.duration_since(bucket.last_refill_at).as_secs_f64(); + let replenished_tokens = bucket.tokens + elapsed_seconds * requests_per_second as f64; + bucket.tokens = replenished_tokens.min(burst_capacity as f64); + bucket.last_refill_at = now; + if bucket.tokens >= 1.0 { + bucket.tokens -= 1.0; + return None; + } + let missing_tokens = 1.0 - bucket.tokens; + let wait_seconds = missing_tokens / requests_per_second as f64; + Some(std::time::Duration::from_secs_f64(wait_seconds.max(0.001))) +} + fn kb_http_shorten_text(input: &str, max_chars: usize) -> std::string::String { let char_count = input.chars().count(); if char_count <= max_chars { @@ -802,6 +1072,24 @@ mod tests { observed_methods_guard.push(method.clone()); } let id = request_json["id"].clone(); + if method == "rateLimitMe" { + let response_body = serde_json::json!({ + "jsonrpc": "2.0", + "error": { + "code": 42900, + "message": "Too many requests" + }, + "id": id + }).to_string(); + let response_text = format!( + "HTTP/1.1 429 TOO MANY REQUESTS\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + response_body.len(), + response_body + ); + let _ = stream.write_all(response_text.as_bytes()).await; + let _ = stream.shutdown().await; + return; + } let response_body = if method == "getHealth" { serde_json::json!({ "jsonrpc": "2.0", @@ -899,6 +1187,12 @@ mod tests { "result": [], "id": id }).to_string() + } else if method == "sendTransaction" { + serde_json::json!({ + "jsonrpc": "2.0", + "result": "signature-test", + "id": id + }).to_string() } else { serde_json::json!({ "jsonrpc": "2.0", @@ -939,7 +1233,7 @@ mod tests { } } } - + fn make_http_endpoint(url: std::string::String) -> crate::KbHttpEndpointConfig { crate::KbHttpEndpointConfig { name: "test_http".to_string(), @@ -950,9 +1244,14 @@ mod tests { roles: vec!["http_queries".to_string()], requests_per_second: 20, burst_capacity: 5, + send_transaction_requests_per_second: Some(2), + send_transaction_burst_capacity: Some(1), + heavy_requests_per_second: Some(1), + heavy_burst_capacity: Some(1), connect_timeout_ms: 2000, request_timeout_ms: 2000, max_idle_connections_per_host: 4, + pause_after_http_429_ms: Some(50), } } @@ -990,6 +1289,22 @@ mod tests { } } + #[test] + fn classify_method_distinguishes_general_send_and_heavy() { + assert_eq!( + crate::HttpClient::classify_method("getSlot"), + crate::KbHttpMethodClass::GeneralRpc + ); + assert_eq!( + crate::HttpClient::classify_method("sendTransaction"), + crate::KbHttpMethodClass::SendTransaction + ); + assert_eq!( + crate::HttpClient::classify_method("getProgramAccounts"), + crate::KbHttpMethodClass::HeavyRead + ); + } + #[tokio::test] async fn next_request_id_is_shared_between_clones() { let endpoint = make_http_endpoint("http://127.0.0.1:65535".to_string()); @@ -1000,6 +1315,33 @@ mod tests { assert_eq!(client.next_request_id(), 3); } + #[tokio::test] + async fn manual_pause_and_resume_work() { + let endpoint = make_http_endpoint("http://127.0.0.1:65535".to_string()); + let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); + let initial_status = client.endpoint_status().await; + assert_eq!(initial_status, crate::KbHttpEndpointStatus::Active); + client.pause_for(25).await; + let paused_status = client.endpoint_status().await; + match paused_status { + crate::KbHttpEndpointStatus::Paused { remaining_ms } => { + assert!(remaining_ms > 0); + } + other => { + panic!("unexpected status: {other:?}"); + } + } + client.resume().await; + let resumed_status = client.endpoint_status().await; + assert_eq!(resumed_status, crate::KbHttpEndpointStatus::Active); + client.disable().await; + let disabled_status = client.endpoint_status().await; + assert_eq!(disabled_status, crate::KbHttpEndpointStatus::Disabled); + client.enable().await; + let enabled_status = client.endpoint_status().await; + assert_eq!(enabled_status, crate::KbHttpEndpointStatus::Active); + } + #[tokio::test] async fn typed_helpers_work_for_basic_methods() { let server = TestHttpServer::spawn().await; @@ -1163,6 +1505,48 @@ mod tests { server.shutdown().await; } + #[tokio::test] + async fn http_429_triggers_local_pause() { + let server = TestHttpServer::spawn().await; + let endpoint = make_http_endpoint(server.url.clone()); + let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); + let result = client + .execute_json_rpc_request_raw("rateLimitMe".to_string(), std::vec::Vec::new()) + .await; + assert!(result.is_err()); + let paused_status = client.endpoint_status().await; + match paused_status { + crate::KbHttpEndpointStatus::Paused { remaining_ms } => { + assert!(remaining_ms > 0); + } + other => { + panic!("unexpected status after 429: {other:?}"); + } + } + tokio::time::sleep(std::time::Duration::from_millis(70)).await; + let resumed_status = client.endpoint_status().await; + assert_eq!(resumed_status, crate::KbHttpEndpointStatus::Active); + server.shutdown().await; + } + + #[tokio::test] + async fn disabled_endpoint_rejects_requests() { + let endpoint = make_http_endpoint("http://127.0.0.1:65535".to_string()); + let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); + client.disable().await; + let result = client.get_health_raw().await; + assert!(result.is_err()); + let error = result.expect_err("disabled endpoint must reject requests"); + match error { + crate::KbError::Http(message) => { + assert!(message.contains("is disabled")); + } + other => { + panic!("unexpected error: {other:?}"); + } + } + } + #[tokio::test] async fn unknown_method_returns_error() { let server = TestHttpServer::spawn().await; diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index 40ee633..a8d37ea 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -43,6 +43,8 @@ pub use crate::http_client::KbJsonRpcHttpErrorResponse; pub use crate::http_client::KbJsonRpcHttpRequest; pub use crate::http_client::KbJsonRpcHttpResponse; pub use crate::http_client::KbJsonRpcHttpSuccessResponse; +pub use crate::http_client::KbHttpEndpointStatus; +pub use crate::http_client::KbHttpMethodClass; pub use crate::http_client::parse_kb_json_rpc_http_response_text; pub use crate::http_client::parse_kb_json_rpc_http_response_value; pub use crate::tracing::KbTracingGuard;