This commit is contained in:
2026-04-22 10:28:52 +02:00
parent ba4a4fdfb7
commit 073266a104
5 changed files with 428 additions and 31 deletions

View File

@@ -13,3 +13,4 @@
0.3.5 - Stabilisation de Demo Ws, lecture correcte des endpoints activés depuis la config, limitation/throttling de laffichage UI sous fort débit
0.4.0 - Socle HttpClient générique async clonable, JSON-RPC HTTP 2.0, résolution dURL avec api_key_env_var, limiteur local req/sec + burst, helpers initiaux getHealth/getVersion/getSlot
0.4.1 - Ajout des premiers helpers HTTP Solana haut niveau, dans la continuité de lAPI du client WebSocket
0.4.2 - Préparation de la politique HTTP avancée : états de pause avant envoi, quotas par famille de méthodes et futur pool dendpoints

View File

@@ -8,7 +8,7 @@ members = [
]
[workspace.package]
version = "0.4.1"
version = "0.4.2"
edition = "2024"
license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot"

View File

@@ -398,16 +398,26 @@ pub struct KbHttpEndpointConfig {
pub api_key_env_var: std::option::Option<std::string::String>,
/// Allowed roles for this endpoint.
pub roles: std::vec::Vec<std::string::String>,
/// Requests per second allowed by the local limiter.
/// Requests per second allowed by the local limiter for general RPC methods.
pub requests_per_second: u32,
/// Maximum local burst capacity.
/// Maximum local burst capacity for general RPC methods.
pub burst_capacity: u32,
/// Optional requests per second override for `sendTransaction`-class methods.
pub send_transaction_requests_per_second: std::option::Option<u32>,
/// Optional burst override for `sendTransaction`-class methods.
pub send_transaction_burst_capacity: std::option::Option<u32>,
/// Optional requests per second override for heavy read methods.
pub heavy_requests_per_second: std::option::Option<u32>,
/// Optional burst override for heavy read methods.
pub heavy_burst_capacity: std::option::Option<u32>,
/// Connect timeout in milliseconds.
pub connect_timeout_ms: u64,
/// Total request timeout in milliseconds.
pub request_timeout_ms: u64,
/// Maximum idle pooled connections per host.
pub max_idle_connections_per_host: usize,
/// Automatic pause duration after an HTTP 429 response, in milliseconds.
pub pause_after_http_429_ms: std::option::Option<u64>,
}
impl KbHttpEndpointConfig {

View File

@@ -5,9 +5,11 @@
//! This module provides a reusable `HttpClient` built on top of `reqwest` for
//! Solana RPC HTTP endpoints.
//!
//! Version `0.4.1` extends the `0.4.0` transport layer with:
//! - raw Solana HTTP helpers
//! - typed Solana HTTP helpers using `solana_rpc_client_api` types
//! Version `0.4.2` extends the `0.4.1` transport layer with:
//! - local endpoint status management (`Active`, `Paused`, `Disabled`)
//! - method classification (`GeneralRpc`, `SendTransaction`, `HeavyRead`)
//! - per-class local rate limiting
//! - automatic pause after HTTP 429 responses
/// JSON-RPC 2.0 request envelope for HTTP.
#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
@@ -92,6 +94,31 @@ pub enum KbJsonRpcHttpResponse {
Error(KbJsonRpcHttpErrorResponse),
}
/// Local HTTP method class used for independent limit buckets.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum KbHttpMethodClass {
/// Standard RPC reads and generic methods.
GeneralRpc,
/// Transaction submission methods.
SendTransaction,
/// Resource-intensive read methods.
HeavyRead,
}
/// Local endpoint status.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum KbHttpEndpointStatus {
/// Endpoint is ready to accept requests.
Active,
/// Endpoint is temporarily paused.
Paused {
/// Remaining pause duration in milliseconds.
remaining_ms: u64,
},
/// Endpoint is manually disabled.
Disabled,
}
#[derive(Debug)]
struct KbHttpTokenBucket {
tokens: f64,
@@ -107,6 +134,38 @@ impl KbHttpTokenBucket {
}
}
#[derive(Debug)]
enum KbHttpEndpointLifecycleState {
Active,
PausedUntil(std::time::Instant),
Disabled,
}
#[derive(Debug)]
struct KbHttpRuntimeState {
lifecycle: KbHttpEndpointLifecycleState,
general_bucket: KbHttpTokenBucket,
send_transaction_bucket: KbHttpTokenBucket,
heavy_read_bucket: KbHttpTokenBucket,
}
impl KbHttpRuntimeState {
fn new(endpoint: &crate::KbHttpEndpointConfig) -> Self {
Self {
lifecycle: KbHttpEndpointLifecycleState::Active,
general_bucket: KbHttpTokenBucket::new(endpoint.burst_capacity),
send_transaction_bucket: KbHttpTokenBucket::new(kb_http_effective_burst_capacity(
endpoint,
KbHttpMethodClass::SendTransaction,
)),
heavy_read_bucket: KbHttpTokenBucket::new(kb_http_effective_burst_capacity(
endpoint,
KbHttpMethodClass::HeavyRead,
)),
}
}
}
/// Generic asynchronous HTTP client.
#[derive(Clone, Debug)]
pub struct HttpClient {
@@ -114,7 +173,7 @@ pub struct HttpClient {
resolved_url: std::string::String,
client: reqwest::Client,
next_request_id: std::sync::Arc<std::sync::atomic::AtomicU64>,
limiter: std::sync::Arc<tokio::sync::Mutex<KbHttpTokenBucket>>,
runtime: std::sync::Arc<tokio::sync::Mutex<KbHttpRuntimeState>>,
}
impl HttpClient {
@@ -137,6 +196,40 @@ impl HttpClient {
endpoint.name
)));
}
if let Some(send_transaction_requests_per_second) =
endpoint.send_transaction_requests_per_second
{
if send_transaction_requests_per_second == 0 {
return Err(crate::KbError::Config(format!(
"http endpoint '{}' must have send_transaction_requests_per_second > 0 when configured",
endpoint.name
)));
}
}
if let Some(send_transaction_burst_capacity) = endpoint.send_transaction_burst_capacity {
if send_transaction_burst_capacity == 0 {
return Err(crate::KbError::Config(format!(
"http endpoint '{}' must have send_transaction_burst_capacity > 0 when configured",
endpoint.name
)));
}
}
if let Some(heavy_requests_per_second) = endpoint.heavy_requests_per_second {
if heavy_requests_per_second == 0 {
return Err(crate::KbError::Config(format!(
"http endpoint '{}' must have heavy_requests_per_second > 0 when configured",
endpoint.name
)));
}
}
if let Some(heavy_burst_capacity) = endpoint.heavy_burst_capacity {
if heavy_burst_capacity == 0 {
return Err(crate::KbError::Config(format!(
"http endpoint '{}' must have heavy_burst_capacity > 0 when configured",
endpoint.name
)));
}
}
if endpoint.max_idle_connections_per_host == 0 {
return Err(crate::KbError::Config(format!(
"http endpoint '{}' must have max_idle_connections_per_host > 0",
@@ -172,13 +265,13 @@ impl HttpClient {
}
};
Ok(Self {
limiter: std::sync::Arc::new(tokio::sync::Mutex::new(KbHttpTokenBucket::new(
endpoint.burst_capacity,
))),
endpoint,
endpoint: endpoint.clone(),
resolved_url,
client,
next_request_id: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)),
runtime: std::sync::Arc::new(tokio::sync::Mutex::new(KbHttpRuntimeState::new(
&endpoint,
))),
})
}
@@ -203,6 +296,68 @@ impl HttpClient {
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
/// Returns the current local endpoint status.
pub async fn endpoint_status(&self) -> KbHttpEndpointStatus {
let mut runtime_guard = self.runtime.lock().await;
kb_http_normalize_runtime_lifecycle(&mut runtime_guard);
match &runtime_guard.lifecycle {
KbHttpEndpointLifecycleState::Active => KbHttpEndpointStatus::Active,
KbHttpEndpointLifecycleState::Disabled => KbHttpEndpointStatus::Disabled,
KbHttpEndpointLifecycleState::PausedUntil(deadline) => {
let now = std::time::Instant::now();
if *deadline <= now {
runtime_guard.lifecycle = KbHttpEndpointLifecycleState::Active;
KbHttpEndpointStatus::Active
} else {
let remaining = deadline.duration_since(now);
let remaining_ms_u128 = remaining.as_millis();
let remaining_ms = if remaining_ms_u128 > u128::from(u64::MAX) {
u64::MAX
} else {
remaining_ms_u128 as u64
};
KbHttpEndpointStatus::Paused { remaining_ms }
}
}
}
}
/// Pauses this endpoint locally before future sends.
pub async fn pause_for(&self, duration_ms: u64) {
let pause_duration = std::time::Duration::from_millis(duration_ms);
let pause_deadline = std::time::Instant::now() + pause_duration;
let mut runtime_guard = self.runtime.lock().await;
runtime_guard.lifecycle = KbHttpEndpointLifecycleState::PausedUntil(pause_deadline);
}
/// Resumes this endpoint if it is paused.
pub async fn resume(&self) {
let mut runtime_guard = self.runtime.lock().await;
if matches!(
runtime_guard.lifecycle,
KbHttpEndpointLifecycleState::PausedUntil(_)
) {
runtime_guard.lifecycle = KbHttpEndpointLifecycleState::Active;
}
}
/// Disables this endpoint locally.
pub async fn disable(&self) {
let mut runtime_guard = self.runtime.lock().await;
runtime_guard.lifecycle = KbHttpEndpointLifecycleState::Disabled;
}
/// Re-enables this endpoint locally.
pub async fn enable(&self) {
let mut runtime_guard = self.runtime.lock().await;
runtime_guard.lifecycle = KbHttpEndpointLifecycleState::Active;
}
/// Classifies one HTTP JSON-RPC method.
pub fn classify_method(method: &str) -> KbHttpMethodClass {
kb_http_classify_method(method)
}
/// Executes one JSON-RPC request and returns the success envelope.
pub async fn execute_json_rpc_request_raw(
&self,
@@ -219,7 +374,10 @@ impl HttpClient {
&self,
request: &KbJsonRpcHttpRequest,
) -> Result<KbJsonRpcHttpSuccessResponse, crate::KbError> {
let rate_limit_result = self.acquire_rate_limit_slot().await;
let method_class = kb_http_classify_method(&request.method);
let rate_limit_result = self
.acquire_rate_limit_slot_for_method_class(method_class)
.await;
if let Err(error) = rate_limit_result {
return Err(error);
}
@@ -232,6 +390,7 @@ impl HttpClient {
endpoint_name = %self.endpoint.name,
endpoint_url = %self.resolved_url,
method = %request.method,
method_class = ?method_class,
request_id = %request.id,
"sending http json-rpc request"
);
@@ -262,6 +421,17 @@ impl HttpClient {
)));
}
};
if status.as_u16() == 429 {
let pause_duration_ms = self.endpoint.pause_after_http_429_ms.unwrap_or(1500);
self.pause_for(pause_duration_ms).await;
return Err(crate::KbError::Http(format!(
"http status 429 returned by endpoint '{}' method '{}'; endpoint paused for {} ms; body='{}'",
self.endpoint.name,
request.method,
pause_duration_ms,
kb_http_shorten_text(&text, 512)
)));
}
if !status.is_success() {
return Err(crate::KbError::Http(format!(
"http status {} returned by endpoint '{}' method '{}' body='{}'",
@@ -580,26 +750,39 @@ impl HttpClient {
.await
}
async fn acquire_rate_limit_slot(&self) -> Result<(), crate::KbError> {
async fn acquire_rate_limit_slot_for_method_class(
&self,
method_class: KbHttpMethodClass,
) -> Result<(), crate::KbError> {
loop {
let wait_duration_option = {
let mut limiter_guard = self.limiter.lock().await;
let now = std::time::Instant::now();
let elapsed_seconds = now
.duration_since(limiter_guard.last_refill_at)
.as_secs_f64();
let replenished_tokens = limiter_guard.tokens
+ elapsed_seconds * self.endpoint.requests_per_second as f64;
let burst_capacity = self.endpoint.burst_capacity as f64;
limiter_guard.tokens = replenished_tokens.min(burst_capacity);
limiter_guard.last_refill_at = now;
if limiter_guard.tokens >= 1.0 {
limiter_guard.tokens -= 1.0;
None
} else {
let missing_tokens = 1.0 - limiter_guard.tokens;
let wait_seconds = missing_tokens / self.endpoint.requests_per_second as f64;
Some(std::time::Duration::from_secs_f64(wait_seconds.max(0.001)))
let mut runtime_guard = self.runtime.lock().await;
kb_http_normalize_runtime_lifecycle(&mut runtime_guard);
match runtime_guard.lifecycle {
KbHttpEndpointLifecycleState::Disabled => {
return Err(crate::KbError::Http(format!(
"http endpoint '{}' is disabled",
self.endpoint.name
)));
}
KbHttpEndpointLifecycleState::PausedUntil(deadline) => {
let now = std::time::Instant::now();
if deadline > now {
Some(deadline.duration_since(now))
} else {
runtime_guard.lifecycle = KbHttpEndpointLifecycleState::Active;
kb_http_consume_rate_limit_token(
&self.endpoint,
&mut runtime_guard,
method_class,
)
}
}
KbHttpEndpointLifecycleState::Active => kb_http_consume_rate_limit_token(
&self.endpoint,
&mut runtime_guard,
method_class,
),
}
};
match wait_duration_option {
@@ -737,6 +920,93 @@ where
}
}
fn kb_http_classify_method(method: &str) -> KbHttpMethodClass {
if method == "sendTransaction" || method == "sendRawTransaction" {
return KbHttpMethodClass::SendTransaction;
}
if method == "getProgramAccounts" || method == "getLargestAccounts" {
return KbHttpMethodClass::HeavyRead;
}
KbHttpMethodClass::GeneralRpc
}
fn kb_http_effective_requests_per_second(
endpoint: &crate::KbHttpEndpointConfig,
method_class: KbHttpMethodClass,
) -> u32 {
match method_class {
KbHttpMethodClass::GeneralRpc => endpoint.requests_per_second,
KbHttpMethodClass::SendTransaction => endpoint
.send_transaction_requests_per_second
.unwrap_or(endpoint.requests_per_second),
KbHttpMethodClass::HeavyRead => endpoint
.heavy_requests_per_second
.unwrap_or(endpoint.requests_per_second),
}
}
fn kb_http_effective_burst_capacity(
endpoint: &crate::KbHttpEndpointConfig,
method_class: KbHttpMethodClass,
) -> u32 {
match method_class {
KbHttpMethodClass::GeneralRpc => endpoint.burst_capacity,
KbHttpMethodClass::SendTransaction => endpoint
.send_transaction_burst_capacity
.unwrap_or(endpoint.burst_capacity),
KbHttpMethodClass::HeavyRead => endpoint
.heavy_burst_capacity
.unwrap_or(endpoint.burst_capacity),
}
}
fn kb_http_normalize_runtime_lifecycle(runtime: &mut KbHttpRuntimeState) {
match runtime.lifecycle {
KbHttpEndpointLifecycleState::PausedUntil(deadline) => {
if deadline <= std::time::Instant::now() {
runtime.lifecycle = KbHttpEndpointLifecycleState::Active;
}
}
_ => {}
}
}
fn kb_http_consume_rate_limit_token(
endpoint: &crate::KbHttpEndpointConfig,
runtime: &mut KbHttpRuntimeState,
method_class: KbHttpMethodClass,
) -> std::option::Option<std::time::Duration> {
let (bucket, requests_per_second, burst_capacity) = match method_class {
KbHttpMethodClass::GeneralRpc => (
&mut runtime.general_bucket,
kb_http_effective_requests_per_second(endpoint, method_class),
kb_http_effective_burst_capacity(endpoint, method_class),
),
KbHttpMethodClass::SendTransaction => (
&mut runtime.send_transaction_bucket,
kb_http_effective_requests_per_second(endpoint, method_class),
kb_http_effective_burst_capacity(endpoint, method_class),
),
KbHttpMethodClass::HeavyRead => (
&mut runtime.heavy_read_bucket,
kb_http_effective_requests_per_second(endpoint, method_class),
kb_http_effective_burst_capacity(endpoint, method_class),
),
};
let now = std::time::Instant::now();
let elapsed_seconds = now.duration_since(bucket.last_refill_at).as_secs_f64();
let replenished_tokens = bucket.tokens + elapsed_seconds * requests_per_second as f64;
bucket.tokens = replenished_tokens.min(burst_capacity as f64);
bucket.last_refill_at = now;
if bucket.tokens >= 1.0 {
bucket.tokens -= 1.0;
return None;
}
let missing_tokens = 1.0 - bucket.tokens;
let wait_seconds = missing_tokens / requests_per_second as f64;
Some(std::time::Duration::from_secs_f64(wait_seconds.max(0.001)))
}
fn kb_http_shorten_text(input: &str, max_chars: usize) -> std::string::String {
let char_count = input.chars().count();
if char_count <= max_chars {
@@ -802,6 +1072,24 @@ mod tests {
observed_methods_guard.push(method.clone());
}
let id = request_json["id"].clone();
if method == "rateLimitMe" {
let response_body = serde_json::json!({
"jsonrpc": "2.0",
"error": {
"code": 42900,
"message": "Too many requests"
},
"id": id
}).to_string();
let response_text = format!(
"HTTP/1.1 429 TOO MANY REQUESTS\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
response_body.len(),
response_body
);
let _ = stream.write_all(response_text.as_bytes()).await;
let _ = stream.shutdown().await;
return;
}
let response_body = if method == "getHealth" {
serde_json::json!({
"jsonrpc": "2.0",
@@ -899,6 +1187,12 @@ mod tests {
"result": [],
"id": id
}).to_string()
} else if method == "sendTransaction" {
serde_json::json!({
"jsonrpc": "2.0",
"result": "signature-test",
"id": id
}).to_string()
} else {
serde_json::json!({
"jsonrpc": "2.0",
@@ -939,7 +1233,7 @@ mod tests {
}
}
}
fn make_http_endpoint(url: std::string::String) -> crate::KbHttpEndpointConfig {
crate::KbHttpEndpointConfig {
name: "test_http".to_string(),
@@ -950,9 +1244,14 @@ mod tests {
roles: vec!["http_queries".to_string()],
requests_per_second: 20,
burst_capacity: 5,
send_transaction_requests_per_second: Some(2),
send_transaction_burst_capacity: Some(1),
heavy_requests_per_second: Some(1),
heavy_burst_capacity: Some(1),
connect_timeout_ms: 2000,
request_timeout_ms: 2000,
max_idle_connections_per_host: 4,
pause_after_http_429_ms: Some(50),
}
}
@@ -990,6 +1289,22 @@ mod tests {
}
}
#[test]
fn classify_method_distinguishes_general_send_and_heavy() {
assert_eq!(
crate::HttpClient::classify_method("getSlot"),
crate::KbHttpMethodClass::GeneralRpc
);
assert_eq!(
crate::HttpClient::classify_method("sendTransaction"),
crate::KbHttpMethodClass::SendTransaction
);
assert_eq!(
crate::HttpClient::classify_method("getProgramAccounts"),
crate::KbHttpMethodClass::HeavyRead
);
}
#[tokio::test]
async fn next_request_id_is_shared_between_clones() {
let endpoint = make_http_endpoint("http://127.0.0.1:65535".to_string());
@@ -1000,6 +1315,33 @@ mod tests {
assert_eq!(client.next_request_id(), 3);
}
#[tokio::test]
async fn manual_pause_and_resume_work() {
let endpoint = make_http_endpoint("http://127.0.0.1:65535".to_string());
let client = crate::HttpClient::new(endpoint).expect("client creation must succeed");
let initial_status = client.endpoint_status().await;
assert_eq!(initial_status, crate::KbHttpEndpointStatus::Active);
client.pause_for(25).await;
let paused_status = client.endpoint_status().await;
match paused_status {
crate::KbHttpEndpointStatus::Paused { remaining_ms } => {
assert!(remaining_ms > 0);
}
other => {
panic!("unexpected status: {other:?}");
}
}
client.resume().await;
let resumed_status = client.endpoint_status().await;
assert_eq!(resumed_status, crate::KbHttpEndpointStatus::Active);
client.disable().await;
let disabled_status = client.endpoint_status().await;
assert_eq!(disabled_status, crate::KbHttpEndpointStatus::Disabled);
client.enable().await;
let enabled_status = client.endpoint_status().await;
assert_eq!(enabled_status, crate::KbHttpEndpointStatus::Active);
}
#[tokio::test]
async fn typed_helpers_work_for_basic_methods() {
let server = TestHttpServer::spawn().await;
@@ -1163,6 +1505,48 @@ mod tests {
server.shutdown().await;
}
#[tokio::test]
async fn http_429_triggers_local_pause() {
let server = TestHttpServer::spawn().await;
let endpoint = make_http_endpoint(server.url.clone());
let client = crate::HttpClient::new(endpoint).expect("client creation must succeed");
let result = client
.execute_json_rpc_request_raw("rateLimitMe".to_string(), std::vec::Vec::new())
.await;
assert!(result.is_err());
let paused_status = client.endpoint_status().await;
match paused_status {
crate::KbHttpEndpointStatus::Paused { remaining_ms } => {
assert!(remaining_ms > 0);
}
other => {
panic!("unexpected status after 429: {other:?}");
}
}
tokio::time::sleep(std::time::Duration::from_millis(70)).await;
let resumed_status = client.endpoint_status().await;
assert_eq!(resumed_status, crate::KbHttpEndpointStatus::Active);
server.shutdown().await;
}
#[tokio::test]
async fn disabled_endpoint_rejects_requests() {
let endpoint = make_http_endpoint("http://127.0.0.1:65535".to_string());
let client = crate::HttpClient::new(endpoint).expect("client creation must succeed");
client.disable().await;
let result = client.get_health_raw().await;
assert!(result.is_err());
let error = result.expect_err("disabled endpoint must reject requests");
match error {
crate::KbError::Http(message) => {
assert!(message.contains("is disabled"));
}
other => {
panic!("unexpected error: {other:?}");
}
}
}
#[tokio::test]
async fn unknown_method_returns_error() {
let server = TestHttpServer::spawn().await;

View File

@@ -43,6 +43,8 @@ pub use crate::http_client::KbJsonRpcHttpErrorResponse;
pub use crate::http_client::KbJsonRpcHttpRequest;
pub use crate::http_client::KbJsonRpcHttpResponse;
pub use crate::http_client::KbJsonRpcHttpSuccessResponse;
pub use crate::http_client::KbHttpEndpointStatus;
pub use crate::http_client::KbHttpMethodClass;
pub use crate::http_client::parse_kb_json_rpc_http_response_text;
pub use crate::http_client::parse_kb_json_rpc_http_response_value;
pub use crate::tracing::KbTracingGuard;