This commit is contained in:
2026-04-22 16:01:19 +02:00
parent 073266a104
commit 23dab2df85
8 changed files with 779 additions and 23 deletions

View File

@@ -14,3 +14,4 @@
0.4.0 - Socle HttpClient générique async clonable, JSON-RPC HTTP 2.0, résolution dURL avec api_key_env_var, limiteur local req/sec + burst, helpers initiaux getHealth/getVersion/getSlot 0.4.0 - Socle HttpClient générique async clonable, JSON-RPC HTTP 2.0, résolution dURL avec api_key_env_var, limiteur local req/sec + burst, helpers initiaux getHealth/getVersion/getSlot
0.4.1 - Ajout des premiers helpers HTTP Solana haut niveau, dans la continuité de lAPI du client WebSocket 0.4.1 - Ajout des premiers helpers HTTP Solana haut niveau, dans la continuité de lAPI du client WebSocket
0.4.2 - Préparation de la politique HTTP avancée : états de pause avant envoi, quotas par famille de méthodes et futur pool dendpoints 0.4.2 - Préparation de la politique HTTP avancée : états de pause avant envoi, quotas par famille de méthodes et futur pool dendpoints
0.4.3 - Pool dendpoints HTTP

View File

@@ -8,7 +8,7 @@ members = [
] ]
[workspace.package] [workspace.package]
version = "0.4.2" version = "0.4.3"
edition = "2024" edition = "2024"
license = "MIT" license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot"

View File

@@ -306,13 +306,24 @@ Livrables :
- distinguer quota RPC général et quota `sendTransaction`, - distinguer quota RPC général et quota `sendTransaction`,
- préparer un futur pool dendpoints HTTP et larbitrage entre eux. - préparer un futur pool dendpoints HTTP et larbitrage entre eux.
### 0.4.3 — Démo HTTP dans `kb_app` ### 0.4.3 — Pool dendpoints HTTP
À faire :
- ajouter un pool d`HttpClient`,
- sélectionner un endpoint selon le rôle demandé,
- ignorer les endpoints `Paused` ou `Disabled`,
- préparer une rotation simple entre endpoints actifs,
- prendre en compte la classe de méthode HTTP,
- préparer le routage multi-RPC et la limitation de concurrence par endpoint.
### 0.4.4 — Démo HTTP dans `kb_app`
À faire : À faire :
- ajouter une fenêtre `Demo Http`, - ajouter une fenêtre `Demo Http`,
- suivre la logique de `Demo Ws`, - suivre la logique de `Demo Ws`,
- permettre de tester les endpoints HTTP configurés, - permettre de tester les endpoints HTTP configurés,
- afficher les réponses JSON-RPC HTTP et les erreurs associées. - afficher les réponses JSON-RPC HTTP et les erreurs associées,
- exposer létat du pool HTTP et les statuts des endpoints sélectionnables.
### 6.12. Version `0.5.x` — Base de données SQLite ### 6.12. Version `0.5.x` — Base de données SQLite
@@ -485,9 +496,9 @@ Le projet doit maintenir au minimum :
La priorité immédiate est désormais la suivante : La priorité immédiate est désormais la suivante :
1. démarrer la version `0.4.1` avec les helpers HTTP Solana, 1. finaliser la version `0.4.3` avec le pool dendpoints HTTP,
2. conserver `HttpClient` comme transport HTTP générique réutilisable, 2. exploiter les statuts `Active` / `Paused` / `Disabled` dans la sélection dendpoint,
3. distinguer clairement les helpers raw et typed quand cela est pertinent, 3. préparer le routage multi-RPC selon le rôle demandé et la classe de méthode,
4. préparer la future gestion avancée des quotas HTTP et des états de pause avant envoi, 4. conserver `HttpClient` comme brique générique réutilisable sous le pool,
5. préparer lintroduction dun pool dendpoints HTTP, 5. démarrer ensuite la version `0.4.4` avec une fenêtre `Demo Http` dans `kb_app`,
6. ajouter ensuite une fenêtre `Demo Http` dans `kb_app` sur le modèle de `Demo Ws`. 6. exposer dans `kb_app` les réponses HTTP, les erreurs et létat du pool.

View File

@@ -9,6 +9,7 @@ authors.workspace = true
publish.workspace = true publish.workspace = true
[dependencies] [dependencies]
chrono.workspace = true
futures-util.workspace = true futures-util.workspace = true
reqwest.workspace = true reqwest.workspace = true
serde.workspace = true serde.workspace = true

View File

@@ -418,6 +418,8 @@ pub struct KbHttpEndpointConfig {
pub max_idle_connections_per_host: usize, pub max_idle_connections_per_host: usize,
/// Automatic pause duration after an HTTP 429 response, in milliseconds. /// Automatic pause duration after an HTTP 429 response, in milliseconds.
pub pause_after_http_429_ms: std::option::Option<u64>, pub pause_after_http_429_ms: std::option::Option<u64>,
/// Maximum number of concurrent in-flight HTTP requests for this endpoint.
pub max_concurrent_requests_per_endpoint: usize,
} }
impl KbHttpEndpointConfig { impl KbHttpEndpointConfig {

View File

@@ -134,7 +134,7 @@ impl KbHttpTokenBucket {
} }
} }
#[derive(Debug)] #[derive(Clone, Debug)]
enum KbHttpEndpointLifecycleState { enum KbHttpEndpointLifecycleState {
Active, Active,
PausedUntil(std::time::Instant), PausedUntil(std::time::Instant),
@@ -174,6 +174,7 @@ pub struct HttpClient {
client: reqwest::Client, client: reqwest::Client,
next_request_id: std::sync::Arc<std::sync::atomic::AtomicU64>, next_request_id: std::sync::Arc<std::sync::atomic::AtomicU64>,
runtime: std::sync::Arc<tokio::sync::Mutex<KbHttpRuntimeState>>, runtime: std::sync::Arc<tokio::sync::Mutex<KbHttpRuntimeState>>,
concurrency_limiter: std::sync::Arc<tokio::sync::Semaphore>,
} }
impl HttpClient { impl HttpClient {
@@ -236,6 +237,12 @@ impl HttpClient {
endpoint.name endpoint.name
))); )));
} }
if endpoint.max_concurrent_requests_per_endpoint == 0 {
return Err(crate::KbError::Config(format!(
"http endpoint '{}' must have max_concurrent_requests_per_endpoint > 0",
endpoint.name
)));
}
let resolved_url_result = endpoint.resolved_url(); let resolved_url_result = endpoint.resolved_url();
let resolved_url = match resolved_url_result { let resolved_url = match resolved_url_result {
Ok(resolved_url) => resolved_url, Ok(resolved_url) => resolved_url,
@@ -272,6 +279,9 @@ impl HttpClient {
runtime: std::sync::Arc::new(tokio::sync::Mutex::new(KbHttpRuntimeState::new( runtime: std::sync::Arc::new(tokio::sync::Mutex::new(KbHttpRuntimeState::new(
&endpoint, &endpoint,
))), ))),
concurrency_limiter: std::sync::Arc::new(tokio::sync::Semaphore::new(
endpoint.max_concurrent_requests_per_endpoint,
)),
}) })
} }
@@ -290,6 +300,19 @@ impl HttpClient {
&self.endpoint &self.endpoint
} }
/// Returns whether this endpoint supports the requested logical role.
pub fn supports_role(&self, required_role: &str) -> bool {
if required_role.trim().is_empty() {
return true;
}
self.endpoint.roles.iter().any(|role| role == required_role)
}
/// Returns the currently available concurrency slots for this endpoint.
pub fn available_concurrency_slots(&self) -> usize {
self.concurrency_limiter.available_permits()
}
/// Returns the next request identifier and increments the internal counter. /// Returns the next request identifier and increments the internal counter.
pub fn next_request_id(&self) -> u64 { pub fn next_request_id(&self) -> u64 {
self.next_request_id self.next_request_id
@@ -300,12 +323,13 @@ impl HttpClient {
pub async fn endpoint_status(&self) -> KbHttpEndpointStatus { pub async fn endpoint_status(&self) -> KbHttpEndpointStatus {
let mut runtime_guard = self.runtime.lock().await; let mut runtime_guard = self.runtime.lock().await;
kb_http_normalize_runtime_lifecycle(&mut runtime_guard); kb_http_normalize_runtime_lifecycle(&mut runtime_guard);
match &runtime_guard.lifecycle {
match runtime_guard.lifecycle.clone() {
KbHttpEndpointLifecycleState::Active => KbHttpEndpointStatus::Active, KbHttpEndpointLifecycleState::Active => KbHttpEndpointStatus::Active,
KbHttpEndpointLifecycleState::Disabled => KbHttpEndpointStatus::Disabled, KbHttpEndpointLifecycleState::Disabled => KbHttpEndpointStatus::Disabled,
KbHttpEndpointLifecycleState::PausedUntil(deadline) => { KbHttpEndpointLifecycleState::PausedUntil(deadline) => {
let now = std::time::Instant::now(); let now = std::time::Instant::now();
if *deadline <= now { if deadline <= now {
runtime_guard.lifecycle = KbHttpEndpointLifecycleState::Active; runtime_guard.lifecycle = KbHttpEndpointLifecycleState::Active;
KbHttpEndpointStatus::Active KbHttpEndpointStatus::Active
} else { } else {
@@ -386,6 +410,16 @@ impl HttpClient {
Ok(body) => body, Ok(body) => body,
Err(error) => return Err(error), Err(error) => return Err(error),
}; };
let concurrency_permit_result = self.concurrency_limiter.clone().acquire_owned().await;
let _concurrency_permit = match concurrency_permit_result {
Ok(concurrency_permit) => concurrency_permit,
Err(error) => {
return Err(crate::KbError::Http(format!(
"cannot acquire concurrency slot for endpoint '{}' method '{}': {}",
self.endpoint.name, request.method, error
)));
}
};
tracing::debug!( tracing::debug!(
endpoint_name = %self.endpoint.name, endpoint_name = %self.endpoint.name,
endpoint_url = %self.resolved_url, endpoint_url = %self.resolved_url,
@@ -411,6 +445,10 @@ impl HttpClient {
} }
}; };
let status = response.status(); let status = response.status();
let retry_after_header = response
.headers()
.get(reqwest::header::RETRY_AFTER)
.cloned();
let text_result = response.text().await; let text_result = response.text().await;
let text = match text_result { let text = match text_result {
Ok(text) => text, Ok(text) => text,
@@ -422,7 +460,13 @@ impl HttpClient {
} }
}; };
if status.as_u16() == 429 { if status.as_u16() == 429 {
let pause_duration_ms = self.endpoint.pause_after_http_429_ms.unwrap_or(1500); let pause_duration_ms = match retry_after_header
.as_ref()
.and_then(kb_http_retry_after_to_pause_ms)
{
Some(retry_after_ms) => retry_after_ms,
None => self.endpoint.pause_after_http_429_ms.unwrap_or(1500),
};
self.pause_for(pause_duration_ms).await; self.pause_for(pause_duration_ms).await;
return Err(crate::KbError::Http(format!( return Err(crate::KbError::Http(format!(
"http status 429 returned by endpoint '{}' method '{}'; endpoint paused for {} ms; body='{}'", "http status 429 returned by endpoint '{}' method '{}'; endpoint paused for {} ms; body='{}'",
@@ -750,6 +794,38 @@ impl HttpClient {
.await .await
} }
/// Raw helper for `sendTransaction`.
pub async fn send_transaction_raw(
&self,
encoded_transaction: std::string::String,
config: std::option::Option<serde_json::Value>,
) -> Result<serde_json::Value, crate::KbError> {
let params = kb_build_first_string_optional_config_params(encoded_transaction, config);
self.execute_json_rpc_result_raw("sendTransaction".to_string(), params)
.await
}
/// Typed helper for `sendTransaction`.
pub async fn send_transaction(
&self,
encoded_transaction: std::string::String,
config: std::option::Option<solana_rpc_client_api::config::RpcSendTransactionConfig>,
) -> Result<std::string::String, crate::KbError> {
let config_value_result =
kb_serialize_optional_json_value(config, "sendTransaction config");
let config_value = match config_value_result {
Ok(config_value) => config_value,
Err(error) => return Err(error),
};
let params =
kb_build_first_string_optional_config_params(encoded_transaction, config_value);
self.execute_json_rpc_request_typed::<std::string::String>(
"sendTransaction".to_string(),
params,
)
.await
}
async fn acquire_rate_limit_slot_for_method_class( async fn acquire_rate_limit_slot_for_method_class(
&self, &self,
method_class: KbHttpMethodClass, method_class: KbHttpMethodClass,
@@ -758,7 +834,7 @@ impl HttpClient {
let wait_duration_option = { let wait_duration_option = {
let mut runtime_guard = self.runtime.lock().await; let mut runtime_guard = self.runtime.lock().await;
kb_http_normalize_runtime_lifecycle(&mut runtime_guard); kb_http_normalize_runtime_lifecycle(&mut runtime_guard);
match runtime_guard.lifecycle { match runtime_guard.lifecycle.clone() {
KbHttpEndpointLifecycleState::Disabled => { KbHttpEndpointLifecycleState::Disabled => {
return Err(crate::KbError::Http(format!( return Err(crate::KbError::Http(format!(
"http endpoint '{}' is disabled", "http endpoint '{}' is disabled",
@@ -961,13 +1037,10 @@ fn kb_http_effective_burst_capacity(
} }
fn kb_http_normalize_runtime_lifecycle(runtime: &mut KbHttpRuntimeState) { fn kb_http_normalize_runtime_lifecycle(runtime: &mut KbHttpRuntimeState) {
match runtime.lifecycle { if let KbHttpEndpointLifecycleState::PausedUntil(deadline) = runtime.lifecycle.clone() {
KbHttpEndpointLifecycleState::PausedUntil(deadline) => { if deadline <= std::time::Instant::now() {
if deadline <= std::time::Instant::now() { runtime.lifecycle = KbHttpEndpointLifecycleState::Active;
runtime.lifecycle = KbHttpEndpointLifecycleState::Active;
}
} }
_ => {}
} }
} }
@@ -1007,6 +1080,35 @@ fn kb_http_consume_rate_limit_token(
Some(std::time::Duration::from_secs_f64(wait_seconds.max(0.001))) Some(std::time::Duration::from_secs_f64(wait_seconds.max(0.001)))
} }
fn kb_http_retry_after_to_pause_ms(
header_value: &reqwest::header::HeaderValue,
) -> std::option::Option<u64> {
let header_text_result = header_value.to_str();
let header_text = match header_text_result {
Ok(header_text) => header_text.trim(),
Err(_) => {
return None;
}
};
let seconds_result = header_text.parse::<u64>();
if let Ok(seconds) = seconds_result {
return Some(seconds.saturating_mul(1000));
}
let parsed_date_result = chrono::DateTime::parse_from_rfc2822(header_text);
let parsed_date = match parsed_date_result {
Ok(parsed_date) => parsed_date.with_timezone(&chrono::Utc),
Err(_) => {
return None;
}
};
let now = chrono::Utc::now();
let delta_ms = parsed_date.signed_duration_since(now).num_milliseconds();
if delta_ms <= 0 {
return Some(0);
}
Some(delta_ms as u64)
}
fn kb_http_shorten_text(input: &str, max_chars: usize) -> std::string::String { fn kb_http_shorten_text(input: &str, max_chars: usize) -> std::string::String {
let char_count = input.chars().count(); let char_count = input.chars().count();
if char_count <= max_chars { if char_count <= max_chars {
@@ -1082,7 +1184,7 @@ mod tests {
"id": id "id": id
}).to_string(); }).to_string();
let response_text = format!( let response_text = format!(
"HTTP/1.1 429 TOO MANY REQUESTS\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", "HTTP/1.1 429 TOO MANY REQUESTS\r\nContent-Type: application/json\r\nRetry-After: 1\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
response_body.len(), response_body.len(),
response_body response_body
); );
@@ -1251,7 +1353,8 @@ mod tests {
connect_timeout_ms: 2000, connect_timeout_ms: 2000,
request_timeout_ms: 2000, request_timeout_ms: 2000,
max_idle_connections_per_host: 4, max_idle_connections_per_host: 4,
pause_after_http_429_ms: Some(50), pause_after_http_429_ms: Some(1500),
max_concurrent_requests_per_endpoint: 2,
} }
} }
@@ -1391,6 +1494,11 @@ mod tests {
.await .await
.expect("get_transaction must succeed"); .expect("get_transaction must succeed");
assert!(transaction.is_none()); assert!(transaction.is_none());
let sent_signature = client
.send_transaction("AAAA".to_string(), None)
.await
.expect("send_transaction must succeed");
assert_eq!(sent_signature, "signature-test".to_string());
let observed_methods = server.observed_methods_snapshot().await; let observed_methods = server.observed_methods_snapshot().await;
assert!(observed_methods.iter().any(|method| method == "getHealth")); assert!(observed_methods.iter().any(|method| method == "getHealth"));
assert!(observed_methods.iter().any(|method| method == "getVersion")); assert!(observed_methods.iter().any(|method| method == "getVersion"));
@@ -1416,6 +1524,11 @@ mod tests {
.iter() .iter()
.any(|method| method == "getTransaction") .any(|method| method == "getTransaction")
); );
assert!(
observed_methods
.iter()
.any(|method| method == "sendTransaction")
);
server.shutdown().await; server.shutdown().await;
} }
@@ -1424,6 +1537,7 @@ mod tests {
let server = TestHttpServer::spawn().await; let server = TestHttpServer::spawn().await;
let endpoint = make_http_endpoint(server.url.clone()); let endpoint = make_http_endpoint(server.url.clone());
let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); let client = crate::HttpClient::new(endpoint).expect("client creation must succeed");
let _ = client let _ = client
.get_health_raw() .get_health_raw()
.await .await
@@ -1467,6 +1581,10 @@ mod tests {
) )
.await .await
.expect("get_transaction_raw must succeed"); .expect("get_transaction_raw must succeed");
let _ = client
.send_transaction_raw("AAAA".to_string(), None)
.await
.expect("send_transaction_raw must succeed");
let observed_methods = server.observed_methods_snapshot().await; let observed_methods = server.observed_methods_snapshot().await;
assert!(observed_methods.iter().any(|method| method == "getHealth")); assert!(observed_methods.iter().any(|method| method == "getHealth"));
assert!(observed_methods.iter().any(|method| method == "getVersion")); assert!(observed_methods.iter().any(|method| method == "getVersion"));
@@ -1502,6 +1620,11 @@ mod tests {
.iter() .iter()
.any(|method| method == "getTransaction") .any(|method| method == "getTransaction")
); );
assert!(
observed_methods
.iter()
.any(|method| method == "sendTransaction")
);
server.shutdown().await; server.shutdown().await;
} }
@@ -1523,7 +1646,7 @@ mod tests {
panic!("unexpected status after 429: {other:?}"); panic!("unexpected status after 429: {other:?}");
} }
} }
tokio::time::sleep(std::time::Duration::from_millis(70)).await; tokio::time::sleep(std::time::Duration::from_millis(1100)).await;
let resumed_status = client.endpoint_status().await; let resumed_status = client.endpoint_status().await;
assert_eq!(resumed_status, crate::KbHttpEndpointStatus::Active); assert_eq!(resumed_status, crate::KbHttpEndpointStatus::Active);
server.shutdown().await; server.shutdown().await;

615
kb_lib/src/http_pool.rs Normal file
View File

@@ -0,0 +1,615 @@
// file: kb_lib/src/http_pool.rs
//! HTTP endpoint pool and routing.
//!
//! This module provides a lightweight endpoint pool on top of `HttpClient`.
//! It is responsible for:
//! - filtering endpoints by role
//! - skipping paused or disabled endpoints
//! - simple round-robin selection among active endpoints
//! - preferring endpoints with available concurrency slots
/// Snapshot of one pooled HTTP endpoint.
#[derive(Clone, Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct KbHttpPoolClientSnapshot {
/// Logical endpoint name.
pub endpoint_name: std::string::String,
/// Provider name.
pub provider: std::string::String,
/// Resolved endpoint URL.
pub endpoint_url: std::string::String,
/// Supported roles.
pub roles: std::vec::Vec<std::string::String>,
/// Current endpoint status string.
pub status: std::string::String,
/// Remaining pause duration in milliseconds when paused.
pub paused_remaining_ms: std::option::Option<u64>,
/// Available concurrency slots.
pub available_concurrency_slots: usize,
}
/// Pool of HTTP endpoints.
#[derive(Clone, Debug)]
pub struct HttpEndpointPool {
clients: std::vec::Vec<crate::HttpClient>,
next_index: std::sync::Arc<std::sync::atomic::AtomicUsize>,
}
impl HttpEndpointPool {
/// Creates a pool from already constructed clients.
pub fn new(clients: std::vec::Vec<crate::HttpClient>) -> Result<Self, crate::KbError> {
if clients.is_empty() {
return Err(crate::KbError::Config(
"http endpoint pool requires at least one client".to_string(),
));
}
Ok(Self {
clients,
next_index: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)),
})
}
/// Creates a pool from endpoint configurations.
pub fn from_endpoint_configs(
endpoint_configs: std::vec::Vec<crate::KbHttpEndpointConfig>,
) -> Result<Self, crate::KbError> {
let mut clients = std::vec::Vec::new();
for endpoint in endpoint_configs {
if !endpoint.enabled {
continue;
}
let client_result = crate::HttpClient::new(endpoint);
let client = match client_result {
Ok(client) => client,
Err(error) => return Err(error),
};
clients.push(client);
}
Self::new(clients)
}
/// Creates a pool from the global configuration.
pub fn from_config(config: &crate::KbConfig) -> Result<Self, crate::KbError> {
Self::from_endpoint_configs(config.solana.http_endpoints.clone())
}
/// Returns the number of pooled clients.
pub fn len(&self) -> usize {
self.clients.len()
}
/// Returns whether the pool is empty.
pub fn is_empty(&self) -> bool {
self.clients.is_empty()
}
/// Returns a live snapshot of pooled endpoints.
pub async fn snapshot(&self) -> std::vec::Vec<KbHttpPoolClientSnapshot> {
let mut snapshots = std::vec::Vec::new();
for client in &self.clients {
let status = client.endpoint_status().await;
let (status_text, paused_remaining_ms) = match status {
crate::KbHttpEndpointStatus::Active => ("Active".to_string(), None),
crate::KbHttpEndpointStatus::Disabled => ("Disabled".to_string(), None),
crate::KbHttpEndpointStatus::Paused { remaining_ms } => {
("Paused".to_string(), Some(remaining_ms))
}
};
snapshots.push(KbHttpPoolClientSnapshot {
endpoint_name: client.endpoint_name().to_string(),
provider: client.endpoint_config().provider.clone(),
endpoint_url: client.endpoint_url().to_string(),
roles: client.endpoint_config().roles.clone(),
status: status_text,
paused_remaining_ms,
available_concurrency_slots: client.available_concurrency_slots(),
});
}
snapshots
}
/// Selects one client for a role and method.
pub async fn select_client_for_role_and_method(
&self,
required_role: &str,
method: &str,
) -> Result<crate::HttpClient, crate::KbError> {
let method_class = crate::HttpClient::classify_method(method);
let _ = method_class;
let mut active_indices = std::vec::Vec::new();
let mut paused_count = 0usize;
let mut disabled_count = 0usize;
let mut role_mismatch_count = 0usize;
let client_count = self.clients.len();
let mut index = 0usize;
while index < client_count {
let client = &self.clients[index];
if !client.supports_role(required_role) {
role_mismatch_count += 1;
index += 1;
continue;
}
let status = client.endpoint_status().await;
match status {
crate::KbHttpEndpointStatus::Active => {
active_indices.push(index);
}
crate::KbHttpEndpointStatus::Paused { .. } => {
paused_count += 1;
}
crate::KbHttpEndpointStatus::Disabled => {
disabled_count += 1;
}
}
index += 1;
}
if active_indices.is_empty() {
return Err(crate::KbError::Http(format!(
"no active http endpoint available for role '{}' and method '{}': paused={}, disabled={}, role_mismatch={}",
required_role, method, paused_count, disabled_count, role_mismatch_count
)));
}
let rotation_seed = self
.next_index
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let active_len = active_indices.len();
let mut offset = 0usize;
while offset < active_len {
let active_index = active_indices[(rotation_seed + offset) % active_len];
let client = &self.clients[active_index];
if client.available_concurrency_slots() > 0 {
return Ok(client.clone());
}
offset += 1;
}
let fallback_index = active_indices[rotation_seed % active_len];
Ok(self.clients[fallback_index].clone())
}
/// Executes one raw JSON-RPC request through the pool.
pub async fn execute_json_rpc_result_raw_for_role(
&self,
required_role: &str,
method: std::string::String,
params: std::vec::Vec<serde_json::Value>,
) -> Result<serde_json::Value, crate::KbError> {
let client_result = self
.select_client_for_role_and_method(required_role, &method)
.await;
let client = match client_result {
Ok(client) => client,
Err(error) => return Err(error),
};
client.execute_json_rpc_result_raw(method, params).await
}
/// Executes one typed JSON-RPC request through the pool.
pub async fn execute_json_rpc_request_typed_for_role<T>(
&self,
required_role: &str,
method: std::string::String,
params: std::vec::Vec<serde_json::Value>,
) -> Result<T, crate::KbError>
where
T: serde::de::DeserializeOwned,
{
let client_result = self
.select_client_for_role_and_method(required_role, &method)
.await;
let client = match client_result {
Ok(client) => client,
Err(error) => return Err(error),
};
client
.execute_json_rpc_request_typed::<T>(method, params)
.await
}
/// Executes `getHealth` through the pool.
pub async fn get_health_for_role(
&self,
required_role: &str,
) -> Result<std::string::String, crate::KbError> {
self.execute_json_rpc_request_typed_for_role::<std::string::String>(
required_role,
"getHealth".to_string(),
std::vec::Vec::new(),
)
.await
}
/// Executes `getSlot` through the pool.
pub async fn get_slot_for_role(
&self,
required_role: &str,
config: std::option::Option<solana_rpc_client_api::config::RpcContextConfig>,
) -> Result<u64, crate::KbError> {
let config_value_result =
kb_pool_serialize_optional_json_value(config, "pool getSlot config");
let config_value = match config_value_result {
Ok(config_value) => config_value,
Err(error) => return Err(error),
};
let params = kb_pool_build_optional_config_only_params(config_value);
self.execute_json_rpc_request_typed_for_role::<u64>(
required_role,
"getSlot".to_string(),
params,
)
.await
}
/// Executes `sendTransaction` through the pool.
pub async fn send_transaction_for_role(
&self,
required_role: &str,
encoded_transaction: std::string::String,
config: std::option::Option<solana_rpc_client_api::config::RpcSendTransactionConfig>,
) -> Result<std::string::String, crate::KbError> {
let config_value_result =
kb_pool_serialize_optional_json_value(config, "pool sendTransaction config");
let config_value = match config_value_result {
Ok(config_value) => config_value,
Err(error) => return Err(error),
};
let params =
kb_pool_build_first_string_optional_config_params(encoded_transaction, config_value);
self.execute_json_rpc_request_typed_for_role::<std::string::String>(
required_role,
"sendTransaction".to_string(),
params,
)
.await
}
}
fn kb_pool_build_optional_config_only_params(
config: std::option::Option<serde_json::Value>,
) -> std::vec::Vec<serde_json::Value> {
let mut params = std::vec::Vec::new();
if let Some(config) = config {
params.push(config);
}
params
}
fn kb_pool_build_first_string_optional_config_params(
first: std::string::String,
config: std::option::Option<serde_json::Value>,
) -> std::vec::Vec<serde_json::Value> {
let mut params = vec![serde_json::Value::String(first)];
if let Some(config) = config {
params.push(config);
}
params
}
fn kb_pool_serialize_optional_json_value<T>(
value: std::option::Option<T>,
label: &str,
) -> Result<std::option::Option<serde_json::Value>, crate::KbError>
where
T: serde::Serialize,
{
match value {
Some(value) => {
let value_result = serde_json::to_value(value);
match value_result {
Ok(value) => Ok(Some(value)),
Err(error) => Err(crate::KbError::Json(format!(
"cannot serialize {}: {error}",
label
))),
}
}
None => Ok(None),
}
}
#[cfg(test)]
mod tests {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
#[derive(Debug)]
struct TestHttpServer {
url: std::string::String,
shutdown_tx: std::option::Option<tokio::sync::oneshot::Sender<()>>,
}
impl TestHttpServer {
async fn spawn(server_name: std::string::String) -> Self {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("listener bind must succeed");
let local_addr = listener.local_addr().expect("local addr must exist");
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let server_name_for_task = server_name.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = &mut shutdown_rx => {
break;
},
accept_result = listener.accept() => {
let (mut stream, _peer_addr) = accept_result.expect("accept must succeed");
let local_server_name = server_name_for_task.clone();
tokio::spawn(async move {
let mut buffer = vec![0u8; 65536];
let read_result = stream.read(&mut buffer).await;
let bytes_read = read_result.expect("read must succeed");
let request_text =
std::string::String::from_utf8_lossy(&buffer[..bytes_read]).to_string();
let split_result: std::vec::Vec<&str> =
request_text.split("\r\n\r\n").collect();
let body = if split_result.len() >= 2 {
split_result[1].to_string()
} else {
std::string::String::new()
};
let request_json: serde_json::Value =
serde_json::from_str(&body).expect("request body must be valid json");
let method = request_json["method"]
.as_str()
.expect("method must be a string")
.to_string();
let id = request_json["id"].clone();
let response_body = if method == "getHealth" {
serde_json::json!({
"jsonrpc": "2.0",
"result": format!("ok-{}", local_server_name),
"id": id
}).to_string()
} else if method == "getSlot" {
let slot_value = if local_server_name == "server_a" {
111u64
} else {
222u64
};
serde_json::json!({
"jsonrpc": "2.0",
"result": slot_value,
"id": id
}).to_string()
} else if method == "sendTransaction" {
serde_json::json!({
"jsonrpc": "2.0",
"result": format!("sig-{}", local_server_name),
"id": id
}).to_string()
} else {
serde_json::json!({
"jsonrpc": "2.0",
"error": {
"code": -32601,
"message": "Method not found"
},
"id": id
}).to_string()
};
let response_text = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
response_body.len(),
response_body
);
let _ = stream.write_all(response_text.as_bytes()).await;
let _ = stream.shutdown().await;
});
}
}
}
});
Self {
url: format!("http://{}", local_addr),
shutdown_tx: Some(shutdown_tx),
}
}
async fn shutdown(mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
}
}
fn make_http_endpoint(
name: &str,
provider: &str,
url: std::string::String,
roles: std::vec::Vec<std::string::String>,
) -> crate::KbHttpEndpointConfig {
crate::KbHttpEndpointConfig {
name: name.to_string(),
enabled: true,
provider: provider.to_string(),
url,
api_key_env_var: None,
roles,
requests_per_second: 20,
burst_capacity: 5,
send_transaction_requests_per_second: Some(2),
send_transaction_burst_capacity: Some(1),
heavy_requests_per_second: Some(1),
heavy_burst_capacity: Some(1),
connect_timeout_ms: 2000,
request_timeout_ms: 2000,
max_idle_connections_per_host: 4,
max_concurrent_requests_per_endpoint: 2,
pause_after_http_429_ms: Some(1500),
}
}
#[tokio::test]
async fn pool_selects_only_matching_active_clients() {
let endpoint_a = make_http_endpoint(
"endpoint_a",
"provider_a",
"http://127.0.0.1:65531".to_string(),
vec!["http_queries".to_string()],
);
let endpoint_b = make_http_endpoint(
"endpoint_b",
"provider_b",
"http://127.0.0.1:65532".to_string(),
vec!["http_transactions".to_string()],
);
let endpoint_c = make_http_endpoint(
"endpoint_c",
"provider_c",
"http://127.0.0.1:65533".to_string(),
vec!["http_queries".to_string()],
);
let client_a = crate::HttpClient::new(endpoint_a).expect("client_a creation must succeed");
let client_b = crate::HttpClient::new(endpoint_b).expect("client_b creation must succeed");
let client_c = crate::HttpClient::new(endpoint_c).expect("client_c creation must succeed");
client_c.pause_for(5000).await;
let pool = crate::HttpEndpointPool::new(vec![client_a.clone(), client_b, client_c])
.expect("pool creation must succeed");
let selected_client = pool
.select_client_for_role_and_method("http_queries", "getSlot")
.await
.expect("selection must succeed");
assert_eq!(selected_client.endpoint_name(), "endpoint_a");
}
#[tokio::test]
async fn pool_round_robins_between_two_active_clients() {
let endpoint_a = make_http_endpoint(
"endpoint_a",
"provider_a",
"http://127.0.0.1:65534".to_string(),
vec!["http_queries".to_string()],
);
let endpoint_b = make_http_endpoint(
"endpoint_b",
"provider_b",
"http://127.0.0.1:65535".to_string(),
vec!["http_queries".to_string()],
);
let client_a = crate::HttpClient::new(endpoint_a).expect("client_a creation must succeed");
let client_b = crate::HttpClient::new(endpoint_b).expect("client_b creation must succeed");
let pool = crate::HttpEndpointPool::new(vec![client_a, client_b])
.expect("pool creation must succeed");
let first = pool
.select_client_for_role_and_method("http_queries", "getSlot")
.await
.expect("first selection must succeed");
let second = pool
.select_client_for_role_and_method("http_queries", "getSlot")
.await
.expect("second selection must succeed");
assert_ne!(first.endpoint_name(), second.endpoint_name());
}
#[tokio::test]
async fn pool_snapshot_reports_statuses() {
let endpoint_a = make_http_endpoint(
"endpoint_a",
"provider_a",
"http://127.0.0.1:65001".to_string(),
vec!["http_queries".to_string()],
);
let endpoint_b = make_http_endpoint(
"endpoint_b",
"provider_b",
"http://127.0.0.1:65002".to_string(),
vec!["http_queries".to_string()],
);
let client_a = crate::HttpClient::new(endpoint_a).expect("client_a creation must succeed");
let client_b = crate::HttpClient::new(endpoint_b).expect("client_b creation must succeed");
client_b.disable().await;
let pool = crate::HttpEndpointPool::new(vec![client_a, client_b])
.expect("pool creation must succeed");
let snapshots = pool.snapshot().await;
assert_eq!(snapshots.len(), 2);
assert!(snapshots.iter().any(|snapshot| snapshot.status == "Active"));
assert!(
snapshots
.iter()
.any(|snapshot| snapshot.status == "Disabled")
);
}
#[tokio::test]
async fn pool_executes_get_health_for_role() {
let server_a = TestHttpServer::spawn("server_a".to_string()).await;
let server_b = TestHttpServer::spawn("server_b".to_string()).await;
let endpoint_a = make_http_endpoint(
"endpoint_a",
"provider_a",
server_a.url.clone(),
vec!["http_queries".to_string()],
);
let endpoint_b = make_http_endpoint(
"endpoint_b",
"provider_b",
server_b.url.clone(),
vec!["http_transactions".to_string()],
);
let pool = crate::HttpEndpointPool::from_endpoint_configs(vec![endpoint_a, endpoint_b])
.expect("pool creation must succeed");
let health = pool
.get_health_for_role("http_queries")
.await
.expect("pool get_health_for_role must succeed");
assert_eq!(health, "ok-server_a".to_string());
server_a.shutdown().await;
server_b.shutdown().await;
}
#[tokio::test]
async fn pool_executes_send_transaction_for_role() {
let server_a = TestHttpServer::spawn("server_a".to_string()).await;
let server_b = TestHttpServer::spawn("server_b".to_string()).await;
let endpoint_a = make_http_endpoint(
"endpoint_a",
"provider_a",
server_a.url.clone(),
vec!["http_queries".to_string()],
);
let endpoint_b = make_http_endpoint(
"endpoint_b",
"provider_b",
server_b.url.clone(),
vec!["http_transactions".to_string()],
);
let pool = crate::HttpEndpointPool::from_endpoint_configs(vec![endpoint_a, endpoint_b])
.expect("pool creation must succeed");
let signature = pool
.send_transaction_for_role("http_transactions", "AAAA".to_string(), None)
.await
.expect("pool send_transaction_for_role must succeed");
assert_eq!(signature, "sig-server_b".to_string());
server_a.shutdown().await;
server_b.shutdown().await;
}
#[tokio::test]
async fn pool_returns_error_when_no_active_client_matches_role() {
let endpoint_a = make_http_endpoint(
"endpoint_a",
"provider_a",
"http://127.0.0.1:65101".to_string(),
vec!["http_queries".to_string()],
);
let client_a = crate::HttpClient::new(endpoint_a).expect("client creation must succeed");
client_a.disable().await;
let pool =
crate::HttpEndpointPool::new(vec![client_a]).expect("pool creation must succeed");
let result = pool
.select_client_for_role_and_method("http_queries", "getSlot")
.await;
assert!(result.is_err());
let error = result.expect_err("selection must fail");
match error {
crate::KbError::Http(message) => {
assert!(message.contains("no active http endpoint available"));
}
other => {
panic!("unexpected error: {other:?}");
}
}
}
}

View File

@@ -17,6 +17,7 @@ mod tracing;
mod types; mod types;
mod ws_client; mod ws_client;
mod rpc_ws_solana; mod rpc_ws_solana;
mod http_pool;
pub use crate::config::KbAppConfig; pub use crate::config::KbAppConfig;
pub use crate::config::KbConfig; pub use crate::config::KbConfig;
@@ -57,4 +58,6 @@ pub use crate::ws_client::WsSubscriptionInfo;
pub use crate::rpc_ws_solana::KbSolanaWsTypedNotification; pub use crate::rpc_ws_solana::KbSolanaWsTypedNotification;
pub use crate::rpc_ws_solana::parse_kb_solana_ws_typed_notification; pub use crate::rpc_ws_solana::parse_kb_solana_ws_typed_notification;
pub use crate::rpc_ws_solana::parse_kb_solana_ws_typed_notification_from_event; pub use crate::rpc_ws_solana::parse_kb_solana_ws_typed_notification_from_event;
pub use crate::http_pool::HttpEndpointPool;
pub use crate::http_pool::KbHttpPoolClientSnapshot;