This commit is contained in:
2026-04-22 09:05:16 +02:00
parent a9d26750fa
commit e9bcca21cc
6 changed files with 777 additions and 50 deletions

View File

@@ -11,3 +11,4 @@
0.3.3 - Ajout du suffixe _raw aux helpers raw pour distinguer typed et raw
0.3.4 - Ajout de la fenêtre Demo Ws dans kb_app pour tester les souscriptions live
0.3.5 - Stabilisation de Demo Ws, lecture correcte des endpoints activés depuis la config, limitation/throttling de laffichage UI sous fort débit
0.4.0 - Socle HttpClient générique async clonable, JSON-RPC HTTP 2.0, résolution dURL avec api_key_env_var, limiteur local req/sec + burst, helpers initiaux getHealth/getVersion/getSlot

View File

@@ -8,7 +8,7 @@ members = [
]
[workspace.package]
version = "0.3.5"
version = "0.4.0"
edition = "2024"
license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot"

View File

@@ -265,20 +265,54 @@ Objectif : rendre la fenêtre de démonstration robuste sous flux élevé et coh
- conserver des compteurs et états UI exploitables,
- mieux gérer les fermetures/ralentissements dendpoints publics.
### 6.11. Version `0.4.x` — Transport HTTP générique
### 6.11. Version `0.4.x` — Transport HTTP générique et helpers RPC
Objectif : construire un `HttpClient` clonable et limité.
Objectif : construire un `HttpClient` clonable, limité et extensible, puis ajouter les premiers helpers HTTP Solana.
### 0.4.0 — Socle `HttpClient`
À faire :
- client `reqwest` asynchrone,
- limites req/sec,
- client `reqwest` asynchrone clonable,
- résolution dURL avec support de `api_key_env_var`,
- limiteur local req/sec,
- burst configurable,
- délais configurables,
- profils par endpoint,
- endpoints publics ou API-key,
- abstraction de requêtes JSON-RPC HTTP,
- premiers appels sur endpoints Solana.
- abstraction JSON-RPC HTTP générique,
- premiers appels de validation Solana.
Livrables :
- `HttpClient`,
- enveloppes JSON-RPC HTTP,
- premiers appels :
- `getHealth`
- `getVersion`
- `getSlot`
### 0.4.1 — Helpers HTTP Solana
À faire :
- ajouter des helpers HTTP haut niveau comme pour le client WS,
- distinguer helpers raw et helpers typed quand cela est pertinent,
- couvrir les premières méthodes utiles du RPC HTTP Solana,
- conserver `HttpClient` comme couche générique réutilisable.
### 0.4.2 — Politique HTTP avancée
À faire :
- préparer un état de pause avant envoi pour un endpoint HTTP,
- préparer plusieurs quotas par famille de méthodes,
- distinguer quota RPC général et quota `sendTransaction`,
- préparer un futur pool dendpoints HTTP et larbitrage entre eux.
### 0.4.3 — Démo HTTP dans `kb_app`
À faire :
- ajouter une fenêtre `Demo Http`,
- suivre la logique de `Demo Ws`,
- permettre de tester les endpoints HTTP configurés,
- afficher les réponses JSON-RPC HTTP et les erreurs associées.
### 6.12. Version `0.5.x` — Base de données SQLite
@@ -449,10 +483,11 @@ Le projet doit maintenir au minimum :
## 12. Priorité immédiate
La priorité immédiate est la suivante :
La priorité immédiate est désormais la suivante :
1. stabiliser `Demo Ws`,
2. corriger la lecture/exposition des endpoints activés depuis la config,
3. améliorer la robustesse de lUI sous fort débit,
4. préparer ensuite le transport HTTP générique,
5. poursuivre la structuration des connecteurs DEX.
1. démarrer la version `0.4.1` avec les helpers HTTP Solana,
2. conserver `HttpClient` comme transport HTTP générique réutilisable,
3. distinguer clairement les helpers raw et typed quand cela est pertinent,
4. préparer la future gestion avancée des quotas HTTP et des états de pause avant envoi,
5. préparer lintroduction dun pool dendpoints HTTP,
6. ajouter ensuite une fenêtre `Demo Http` dans `kb_app` sur le modèle de `Demo Ws`.

View File

@@ -166,7 +166,7 @@ impl KbConfig {
Ok(())
}
/// Returns a named HTTP endpoint by reference.
/// Finds one HTTP endpoint by its logical name.
pub fn find_http_endpoint(
&self,
endpoint_name: &str,
@@ -216,9 +216,9 @@ impl KbConfig {
endpoint.name
)));
}
if endpoint.burst == 0 {
if endpoint.burst_capacity == 0 {
return Err(crate::KbError::Config(format!(
"http endpoint '{}' burst must be > 0",
"http endpoint '{}' burst_capacity must be > 0",
endpoint.name
)));
}
@@ -384,33 +384,58 @@ pub struct KbSolanaConfig {
/// HTTP endpoint configuration.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct KbHttpEndpointConfig {
/// Stable internal endpoint name used by the application.
/// Logical endpoint name.
pub name: std::string::String,
/// Enables or disables the endpoint.
/// Whether this endpoint is enabled.
pub enabled: bool,
/// Provider name such as `solana-public`, `helius`, or `custom`.
/// Provider name.
pub provider: std::string::String,
/// Base HTTP RPC URL.
/// Base HTTP URL.
pub url: std::string::String,
/// Optional environment variable name used to resolve an API key later.
/// Optional environment variable name containing an API key.
pub api_key_env_var: std::option::Option<std::string::String>,
/// Logical roles assigned to this endpoint.
/// Allowed roles for this endpoint.
pub roles: std::vec::Vec<std::string::String>,
/// Allowed average request rate.
/// Requests per second allowed by the local limiter.
pub requests_per_second: u32,
/// Burst capacity for future rate-limiting.
pub burst: u32,
/// HTTP connect timeout in milliseconds.
/// Maximum local burst capacity.
pub burst_capacity: u32,
/// Connect timeout in milliseconds.
pub connect_timeout_ms: u64,
/// HTTP request timeout in milliseconds.
/// Total request timeout in milliseconds.
pub request_timeout_ms: u64,
/// Maximum idle pooled connections per host.
pub max_idle_connections_per_host: usize,
}
impl KbHttpEndpointConfig {
/// Returns the resolved endpoint URL.
/// Returns the resolved URL, replacing an `${ENV_VAR}` placeholder when
/// `api_key_env_var` is configured.
pub fn resolved_url(&self) -> Result<std::string::String, crate::KbError> {
kb_resolve_endpoint_url(&self.url, &self.api_key_env_var)
let env_var_name_option = self.api_key_env_var.as_ref();
let env_var_name = match env_var_name_option {
Some(env_var_name) => env_var_name,
None => {
return Ok(self.url.clone());
}
};
let api_key_result = std::env::var(env_var_name);
let api_key = match api_key_result {
Ok(api_key) => api_key,
Err(error) => {
return Err(crate::KbError::Config(format!(
"cannot resolve api key env var '{}' for http endpoint '{}': {}",
env_var_name, self.name, error
)));
}
};
let placeholder = format!("${{{}}}", env_var_name);
if self.url.contains(&placeholder) {
return Ok(self.url.replace(&placeholder, &api_key));
}
Ok(self.url.clone())
}
}

View File

@@ -1,32 +1,168 @@
// file: kb_lib/src/http_client.rs
//! Generic asynchronous HTTP client skeleton.
//! Generic asynchronous HTTP JSON-RPC client.
//!
//! The transport is intentionally minimal in `0.0.2`. Endpoint binding and
//! client construction are stabilized now, while JSON-RPC request execution,
//! throttling, and batching are scheduled for `0.4.x`.
//! This module provides a reusable `HttpClient` built on top of `reqwest` for
//! Solana RPC HTTP endpoints.
//!
//! Version `0.4.0` keeps the API intentionally small:
//! - reusable async client
//! - endpoint URL resolution via config
//! - local req/sec + burst limiter
//! - generic JSON-RPC 2.0 request/response handling
//! - a few initial Solana validation helpers
/// Generic asynchronous HTTP client placeholder.
/// JSON-RPC 2.0 request envelope for HTTP.
#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct KbJsonRpcHttpRequest {
/// JSON-RPC version, expected to be `"2.0"`.
pub jsonrpc: std::string::String,
/// Client request identifier.
pub id: serde_json::Value,
/// RPC method name.
pub method: std::string::String,
/// Ordered method parameters.
pub params: std::vec::Vec<serde_json::Value>,
}
impl KbJsonRpcHttpRequest {
/// Creates a new request with a numeric identifier.
pub fn new_with_u64_id(
id: u64,
method: std::string::String,
params: std::vec::Vec<serde_json::Value>,
) -> Self {
Self {
jsonrpc: "2.0".to_string(),
id: serde_json::Value::from(id),
method,
params,
}
}
/// Serializes the request into a compact JSON string.
pub fn to_json_string(&self) -> Result<std::string::String, crate::KbError> {
let text_result = serde_json::to_string(self);
match text_result {
Ok(text) => Ok(text),
Err(error) => Err(crate::KbError::Json(format!(
"cannot serialize http json-rpc request '{}': {error}",
self.method
))),
}
}
}
/// JSON-RPC 2.0 success response.
#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct KbJsonRpcHttpSuccessResponse {
/// JSON-RPC version, expected to be `"2.0"`.
pub jsonrpc: std::string::String,
/// Result payload.
pub result: serde_json::Value,
/// Request identifier echoed by the server.
pub id: serde_json::Value,
}
/// JSON-RPC 2.0 error object.
#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct KbJsonRpcHttpErrorObject {
/// Numeric JSON-RPC error code.
pub code: i64,
/// Human-readable error message.
pub message: std::string::String,
/// Optional server-provided payload.
pub data: std::option::Option<serde_json::Value>,
}
/// JSON-RPC 2.0 error response.
#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct KbJsonRpcHttpErrorResponse {
/// JSON-RPC version, expected to be `"2.0"`.
pub jsonrpc: std::string::String,
/// Error payload.
pub error: KbJsonRpcHttpErrorObject,
/// Request identifier echoed by the server.
pub id: serde_json::Value,
}
/// Parsed HTTP JSON-RPC response envelope.
#[derive(Clone, Debug, PartialEq)]
pub enum KbJsonRpcHttpResponse {
/// Success response.
Success(KbJsonRpcHttpSuccessResponse),
/// Error response.
Error(KbJsonRpcHttpErrorResponse),
}
#[derive(Debug)]
struct KbHttpTokenBucket {
tokens: f64,
last_refill_at: std::time::Instant,
}
impl KbHttpTokenBucket {
fn new(burst_capacity: u32) -> Self {
Self {
tokens: burst_capacity as f64,
last_refill_at: std::time::Instant::now(),
}
}
}
/// Generic asynchronous HTTP client.
#[derive(Clone, Debug)]
pub struct HttpClient {
endpoint: crate::KbHttpEndpointConfig,
resolved_url: std::string::String,
client: reqwest::Client,
next_request_id: std::sync::Arc<std::sync::atomic::AtomicU64>,
limiter: std::sync::Arc<tokio::sync::Mutex<KbHttpTokenBucket>>,
}
impl HttpClient {
/// Creates a new HTTP client bound to a named endpoint configuration.
/// Creates a new HTTP client bound to one endpoint configuration.
pub fn new(endpoint: crate::KbHttpEndpointConfig) -> Result<Self, crate::KbError> {
if endpoint.name.trim().is_empty() {
return Err(crate::KbError::Config(
"http client endpoint name must not be empty".to_string(),
));
}
if endpoint.requests_per_second == 0 {
return Err(crate::KbError::Config(format!(
"http endpoint '{}' must have requests_per_second > 0",
endpoint.name
)));
}
if endpoint.burst_capacity == 0 {
return Err(crate::KbError::Config(format!(
"http endpoint '{}' must have burst_capacity > 0",
endpoint.name
)));
}
if endpoint.max_idle_connections_per_host == 0 {
return Err(crate::KbError::Config(format!(
"http endpoint '{}' must have max_idle_connections_per_host > 0",
endpoint.name
)));
}
let resolved_url_result = endpoint.resolved_url();
let resolved_url = match resolved_url_result {
Ok(resolved_url) => resolved_url,
Err(error) => return Err(error),
};
let builder = reqwest::Client::builder()
.connect_timeout(std::time::Duration::from_millis(
endpoint.connect_timeout_ms,
))
.timeout(std::time::Duration::from_millis(
endpoint.request_timeout_ms,
))
.pool_max_idle_per_host(endpoint.max_idle_connections_per_host)
.user_agent(format!(
"{}/{}",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION")
));
let client_result = builder.build();
let client = match client_result {
@@ -38,36 +174,559 @@ impl HttpClient {
)));
}
};
Ok(Self { endpoint, client })
Ok(Self {
limiter: std::sync::Arc::new(tokio::sync::Mutex::new(KbHttpTokenBucket::new(
endpoint.burst_capacity,
))),
endpoint,
resolved_url,
client,
next_request_id: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)),
})
}
/// Returns the endpoint name of this client.
/// Returns the endpoint name.
pub fn endpoint_name(&self) -> &str {
&self.endpoint.name
}
/// Returns the endpoint URL of this client.
/// Returns the resolved endpoint URL.
pub fn endpoint_url(&self) -> &str {
&self.endpoint.url
&self.resolved_url
}
/// Returns the endpoint configuration of this client.
/// Returns the endpoint configuration.
pub fn endpoint_config(&self) -> &crate::KbHttpEndpointConfig {
&self.endpoint
}
/// Returns the underlying reqwest client reference.
pub fn raw_client(&self) -> &reqwest::Client {
&self.client
/// Returns the next request identifier and increments the internal counter.
pub fn next_request_id(&self) -> u64 {
self.next_request_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
/// Sends a JSON-RPC payload.
pub async fn send_json_rpc_request(
/// Executes one JSON-RPC request and returns the success envelope.
pub async fn execute_json_rpc_request_raw(
&self,
_payload: &serde_json::Value,
method: std::string::String,
params: std::vec::Vec<serde_json::Value>,
) -> Result<KbJsonRpcHttpSuccessResponse, crate::KbError> {
let request_id = self.next_request_id();
let request = KbJsonRpcHttpRequest::new_with_u64_id(request_id, method, params);
self.execute_json_rpc_request_object(&request).await
}
/// Executes one prebuilt JSON-RPC request object.
pub async fn execute_json_rpc_request_object(
&self,
request: &KbJsonRpcHttpRequest,
) -> Result<KbJsonRpcHttpSuccessResponse, crate::KbError> {
let rate_limit_result = self.acquire_rate_limit_slot().await;
if let Err(error) = rate_limit_result {
return Err(error);
}
let body_result = request.to_json_string();
let body = match body_result {
Ok(body) => body,
Err(error) => return Err(error),
};
tracing::debug!(
endpoint_name = %self.endpoint.name,
endpoint_url = %self.resolved_url,
method = %request.method,
request_id = %request.id,
"sending http json-rpc request"
);
let send_result = self
.client
.post(self.resolved_url.clone())
.header("content-type", "application/json")
.body(body)
.send()
.await;
let response = match send_result {
Ok(response) => response,
Err(error) => {
return Err(crate::KbError::Http(format!(
"http request failed for endpoint '{}' method '{}': {error}",
self.endpoint.name, request.method
)));
}
};
let status = response.status();
let text_result = response.text().await;
let text = match text_result {
Ok(text) => text,
Err(error) => {
return Err(crate::KbError::Http(format!(
"cannot read http response body for endpoint '{}' method '{}': {error}",
self.endpoint.name, request.method
)));
}
};
if !status.is_success() {
return Err(crate::KbError::Http(format!(
"http status {} returned by endpoint '{}' method '{}' body='{}'",
status,
self.endpoint.name,
request.method,
kb_http_shorten_text(&text, 512)
)));
}
let parse_result = parse_kb_json_rpc_http_response_text(&text);
let parsed_response = match parse_result {
Ok(parsed_response) => parsed_response,
Err(error) => return Err(error),
};
match parsed_response {
KbJsonRpcHttpResponse::Success(success_response) => Ok(success_response),
KbJsonRpcHttpResponse::Error(error_response) => Err(crate::KbError::Http(format!(
"json-rpc http error on endpoint '{}' method '{}': code={} message={}",
self.endpoint.name,
request.method,
error_response.error.code,
error_response.error.message
))),
}
}
/// Executes one JSON-RPC request and decodes `result` into `T`.
pub async fn execute_json_rpc_request_typed<T>(
&self,
method: std::string::String,
params: std::vec::Vec<serde_json::Value>,
) -> Result<T, crate::KbError>
where
T: serde::de::DeserializeOwned,
{
let raw_result = self.execute_json_rpc_request_raw(method, params).await;
let raw_response = match raw_result {
Ok(raw_response) => raw_response,
Err(error) => return Err(error),
};
let typed_result = serde_json::from_value::<T>(raw_response.result);
match typed_result {
Ok(value) => Ok(value),
Err(error) => Err(crate::KbError::Json(format!(
"cannot decode typed http json-rpc result: {error}"
))),
}
}
/// Calls `getHealth`.
pub async fn get_health(&self) -> Result<serde_json::Value, crate::KbError> {
let raw_result = self
.execute_json_rpc_request_raw("getHealth".to_string(), std::vec::Vec::new())
.await;
match raw_result {
Ok(response) => Ok(response.result),
Err(error) => Err(error),
}
}
/// Calls `getVersion`.
pub async fn get_version(&self) -> Result<serde_json::Value, crate::KbError> {
let raw_result = self
.execute_json_rpc_request_raw("getVersion".to_string(), std::vec::Vec::new())
.await;
match raw_result {
Ok(response) => Ok(response.result),
Err(error) => Err(error),
}
}
/// Calls `getSlot`.
pub async fn get_slot(
&self,
commitment: std::option::Option<std::string::String>,
) -> Result<serde_json::Value, crate::KbError> {
Err(crate::KbError::NotImplemented(
"HttpClient::send_json_rpc_request is scheduled for version 0.4.x".to_string(),
let mut params = std::vec::Vec::new();
if let Some(commitment) = commitment {
params.push(serde_json::json!({
"commitment": commitment
}));
}
let raw_result = self
.execute_json_rpc_request_raw("getSlot".to_string(), params)
.await;
match raw_result {
Ok(response) => Ok(response.result),
Err(error) => Err(error),
}
}
async fn acquire_rate_limit_slot(&self) -> Result<(), crate::KbError> {
loop {
let wait_duration_option = {
let mut limiter_guard = self.limiter.lock().await;
let now = std::time::Instant::now();
let elapsed_seconds = now
.duration_since(limiter_guard.last_refill_at)
.as_secs_f64();
let replenished_tokens = limiter_guard.tokens
+ elapsed_seconds * self.endpoint.requests_per_second as f64;
let burst_capacity = self.endpoint.burst_capacity as f64;
limiter_guard.tokens = replenished_tokens.min(burst_capacity);
limiter_guard.last_refill_at = now;
if limiter_guard.tokens >= 1.0 {
limiter_guard.tokens -= 1.0;
None
} else {
let missing_tokens = 1.0 - limiter_guard.tokens;
let wait_seconds = missing_tokens / self.endpoint.requests_per_second as f64;
Some(std::time::Duration::from_secs_f64(wait_seconds.max(0.001)))
}
};
match wait_duration_option {
Some(wait_duration) => {
tokio::time::sleep(wait_duration).await;
}
None => {
break;
}
}
}
Ok(())
}
}
/// Parses one JSON-RPC HTTP response text.
pub fn parse_kb_json_rpc_http_response_text(
text: &str,
) -> Result<KbJsonRpcHttpResponse, crate::KbError> {
let value_result = serde_json::from_str::<serde_json::Value>(text);
let value = match value_result {
Ok(value) => value,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot parse http json-rpc text: {error}"
)));
}
};
parse_kb_json_rpc_http_response_value(&value)
}
/// Parses one JSON-RPC HTTP response value.
pub fn parse_kb_json_rpc_http_response_value(
value: &serde_json::Value,
) -> Result<KbJsonRpcHttpResponse, crate::KbError> {
let object_option = value.as_object();
let object = match object_option {
Some(object) => object,
None => {
return Err(crate::KbError::Json(
"http json-rpc payload must be a JSON object".to_string(),
));
}
};
let jsonrpc_value_option = object.get("jsonrpc");
let jsonrpc_value = match jsonrpc_value_option {
Some(jsonrpc_value) => jsonrpc_value,
None => {
return Err(crate::KbError::Json(
"http json-rpc payload is missing 'jsonrpc'".to_string(),
));
}
};
let jsonrpc_string_option = jsonrpc_value.as_str();
let jsonrpc_string = match jsonrpc_string_option {
Some(jsonrpc_string) => jsonrpc_string,
None => {
return Err(crate::KbError::Json(
"http json-rpc field 'jsonrpc' must be a string".to_string(),
));
}
};
if jsonrpc_string != "2.0" {
return Err(crate::KbError::Json(format!(
"unsupported http json-rpc version '{}'",
jsonrpc_string
)));
}
let has_result = object.contains_key("result");
let has_error = object.contains_key("error");
let has_id = object.contains_key("id");
if has_id && has_result && !has_error {
let response_result = serde_json::from_value::<KbJsonRpcHttpSuccessResponse>(value.clone());
return match response_result {
Ok(response) => Ok(KbJsonRpcHttpResponse::Success(response)),
Err(error) => Err(crate::KbError::Json(format!(
"cannot parse http json-rpc success response: {error}"
))),
};
}
if has_id && has_error && !has_result {
let response_result = serde_json::from_value::<KbJsonRpcHttpErrorResponse>(value.clone());
return match response_result {
Ok(response) => Ok(KbJsonRpcHttpResponse::Error(response)),
Err(error) => Err(crate::KbError::Json(format!(
"cannot parse http json-rpc error response: {error}"
))),
};
}
Err(crate::KbError::Json(
"unsupported http json-rpc response shape".to_string(),
))
}
fn kb_http_shorten_text(input: &str, max_chars: usize) -> std::string::String {
let char_count = input.chars().count();
if char_count <= max_chars {
return input.to_string();
}
let shortened: std::string::String = input.chars().take(max_chars).collect();
format!("{shortened} …[truncated {} chars]", char_count - max_chars)
}
#[cfg(test)]
mod tests {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
#[derive(Debug)]
struct TestHttpServer {
url: std::string::String,
shutdown_tx: std::option::Option<tokio::sync::oneshot::Sender<()>>,
observed_methods: std::sync::Arc<tokio::sync::Mutex<std::vec::Vec<std::string::String>>>,
}
impl TestHttpServer {
async fn spawn() -> Self {
let observed_methods =
std::sync::Arc::new(tokio::sync::Mutex::new(std::vec::Vec::new()));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("listener bind must succeed");
let local_addr = listener.local_addr().expect("local addr must exist");
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let observed_methods_for_server = observed_methods.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = &mut shutdown_rx => {
break;
},
accept_result = listener.accept() => {
let (mut stream, _peer_addr) = accept_result.expect("accept must succeed");
let observed_methods_for_connection = observed_methods_for_server.clone();
tokio::spawn(async move {
let mut buffer = vec![0u8; 8192];
let read_result = stream.read(&mut buffer).await;
let bytes_read = read_result.expect("read must succeed");
let request_text =
std::string::String::from_utf8_lossy(&buffer[..bytes_read]).to_string();
let split_result: std::vec::Vec<&str> =
request_text.split("\r\n\r\n").collect();
let body = if split_result.len() >= 2 {
split_result[1].to_string()
} else {
std::string::String::new()
};
let request_json: serde_json::Value =
serde_json::from_str(&body).expect("request body must be valid json");
let method = request_json["method"]
.as_str()
.expect("method must be a string")
.to_string();
{
let mut observed_methods_guard =
observed_methods_for_connection.lock().await;
observed_methods_guard.push(method.clone());
}
let id = request_json["id"].clone();
let response_body = if method == "getHealth" {
serde_json::json!({
"jsonrpc": "2.0",
"result": "ok",
"id": id
}).to_string()
} else if method == "getVersion" {
serde_json::json!({
"jsonrpc": "2.0",
"result": {
"solana-core": "2.2.3",
"feature-set": 123
},
"id": id
}).to_string()
} else if method == "getSlot" {
serde_json::json!({
"jsonrpc": "2.0",
"result": 424242u64,
"id": id
}).to_string()
} else {
serde_json::json!({
"jsonrpc": "2.0",
"error": {
"code": -32601,
"message": "Method not found"
},
"id": id
}).to_string()
};
let response_text = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
response_body.len(),
response_body
);
let _ = stream.write_all(response_text.as_bytes()).await;
let _ = stream.shutdown().await;
});
}
}
}
});
Self {
url: format!("http://{}", local_addr),
shutdown_tx: Some(shutdown_tx),
observed_methods,
}
}
async fn observed_methods_snapshot(&self) -> std::vec::Vec<std::string::String> {
let observed_methods_guard = self.observed_methods.lock().await;
observed_methods_guard.clone()
}
async fn shutdown(mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
}
}
fn make_http_endpoint(url: std::string::String) -> crate::KbHttpEndpointConfig {
crate::KbHttpEndpointConfig {
name: "test_http".to_string(),
enabled: true,
provider: "test".to_string(),
url,
api_key_env_var: None,
roles: vec!["http_queries".to_string()],
requests_per_second: 20,
burst_capacity: 5,
connect_timeout_ms: 2000,
request_timeout_ms: 2000,
max_idle_connections_per_host: 4,
}
}
#[test]
fn parse_http_success_response_works() {
let parsed = crate::parse_kb_json_rpc_http_response_text(
r#"{"jsonrpc":"2.0","result":"ok","id":1}"#,
)
.expect("parse must succeed");
match parsed {
crate::KbJsonRpcHttpResponse::Success(response) => {
assert_eq!(response.result, serde_json::Value::String("ok".to_string()));
assert_eq!(response.id, serde_json::Value::from(1u64));
}
other => {
panic!("unexpected response: {other:?}");
}
}
}
#[test]
fn parse_http_error_response_works() {
let parsed = crate::parse_kb_json_rpc_http_response_text(
r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":1}"#,
)
.expect("parse must succeed");
match parsed {
crate::KbJsonRpcHttpResponse::Error(response) => {
assert_eq!(response.error.code, -32601);
assert_eq!(response.error.message, "Method not found");
}
other => {
panic!("unexpected response: {other:?}");
}
}
}
#[tokio::test]
async fn next_request_id_is_shared_between_clones() {
let endpoint = make_http_endpoint("http://127.0.0.1:65535".to_string());
let client = crate::HttpClient::new(endpoint).expect("client creation must succeed");
let cloned = client.clone();
assert_eq!(client.next_request_id(), 1);
assert_eq!(cloned.next_request_id(), 2);
assert_eq!(client.next_request_id(), 3);
}
#[tokio::test]
async fn get_health_works() {
let server = TestHttpServer::spawn().await;
let endpoint = make_http_endpoint(server.url.clone());
let client = crate::HttpClient::new(endpoint).expect("client creation must succeed");
let result = client.get_health().await.expect("get_health must succeed");
assert_eq!(result, serde_json::Value::String("ok".to_string()));
let observed_methods = server.observed_methods_snapshot().await;
assert!(observed_methods.iter().any(|method| method == "getHealth"));
server.shutdown().await;
}
#[tokio::test]
async fn get_version_works() {
let server = TestHttpServer::spawn().await;
let endpoint = make_http_endpoint(server.url.clone());
let client = crate::HttpClient::new(endpoint).expect("client creation must succeed");
let result = client
.get_version()
.await
.expect("get_version must succeed");
assert_eq!(
result["solana-core"],
serde_json::Value::String("2.2.3".to_string())
);
assert_eq!(result["feature-set"], serde_json::Value::from(123u64));
let observed_methods = server.observed_methods_snapshot().await;
assert!(observed_methods.iter().any(|method| method == "getVersion"));
server.shutdown().await;
}
#[tokio::test]
async fn get_slot_works() {
let server = TestHttpServer::spawn().await;
let endpoint = make_http_endpoint(server.url.clone());
let client = crate::HttpClient::new(endpoint).expect("client creation must succeed");
let result = client
.get_slot(Some("finalized".to_string()))
.await
.expect("get_slot must succeed");
assert_eq!(result, serde_json::Value::from(424242u64));
let observed_methods = server.observed_methods_snapshot().await;
assert!(observed_methods.iter().any(|method| method == "getSlot"));
server.shutdown().await;
}
#[tokio::test]
async fn unknown_method_returns_error() {
let server = TestHttpServer::spawn().await;
let endpoint = make_http_endpoint(server.url.clone());
let client = crate::HttpClient::new(endpoint).expect("client creation must succeed");
let result = client
.execute_json_rpc_request_raw("unknownMethod".to_string(), std::vec::Vec::new())
.await;
assert!(result.is_err());
let error = result.expect_err("unknown method must fail");
match error {
crate::KbError::Http(message) => {
assert!(message.contains("Method not found"));
}
other => {
panic!("unexpected error: {other:?}");
}
}
server.shutdown().await;
}
}

View File

@@ -38,6 +38,13 @@ pub use crate::rpc_ws::kb_is_probable_json_rpc_object_text;
pub use crate::rpc_ws::parse_kb_json_rpc_ws_incoming_text;
pub use crate::rpc_ws::parse_kb_json_rpc_ws_incoming_value;
pub use crate::http_client::HttpClient;
pub use crate::http_client::KbJsonRpcHttpErrorObject;
pub use crate::http_client::KbJsonRpcHttpErrorResponse;
pub use crate::http_client::KbJsonRpcHttpRequest;
pub use crate::http_client::KbJsonRpcHttpResponse;
pub use crate::http_client::KbJsonRpcHttpSuccessResponse;
pub use crate::http_client::parse_kb_json_rpc_http_response_text;
pub use crate::http_client::parse_kb_json_rpc_http_response_value;
pub use crate::tracing::KbTracingGuard;
pub use crate::tracing::init_tracing;
pub use crate::types::KbConnectionState;