diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dd04aa..5916ef0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,3 +11,4 @@ 0.3.3 - Ajout du suffixe _raw aux helpers raw pour distinguer typed et raw 0.3.4 - Ajout de la fenêtre Demo Ws dans kb_app pour tester les souscriptions live 0.3.5 - Stabilisation de Demo Ws, lecture correcte des endpoints activés depuis la config, limitation/throttling de l’affichage UI sous fort débit +0.4.0 - Socle HttpClient générique async clonable, JSON-RPC HTTP 2.0, résolution d’URL avec api_key_env_var, limiteur local req/sec + burst, helpers initiaux getHealth/getVersion/getSlot diff --git a/Cargo.toml b/Cargo.toml index 8bc7147..1701e7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.3.5" +version = "0.4.0" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index e3967df..98a2874 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -265,20 +265,54 @@ Objectif : rendre la fenêtre de démonstration robuste sous flux élevé et coh - conserver des compteurs et états UI exploitables, - mieux gérer les fermetures/ralentissements d’endpoints publics. -### 6.11. Version `0.4.x` — Transport HTTP générique +### 6.11. Version `0.4.x` — Transport HTTP générique et helpers RPC -Objectif : construire un `HttpClient` clonable et limité. +Objectif : construire un `HttpClient` clonable, limité et extensible, puis ajouter les premiers helpers HTTP Solana. +### 0.4.0 — Socle `HttpClient` À faire : -- client `reqwest` asynchrone, -- limites req/sec, +- client `reqwest` asynchrone clonable, +- résolution d’URL avec support de `api_key_env_var`, +- limiteur local req/sec, - burst configurable, - délais configurables, - profils par endpoint, -- endpoints publics ou API-key, -- abstraction de requêtes JSON-RPC HTTP, -- premiers appels sur endpoints Solana. +- abstraction JSON-RPC HTTP générique, +- premiers appels de validation Solana. + +Livrables : + +- `HttpClient`, +- enveloppes JSON-RPC HTTP, +- premiers appels : + - `getHealth` + - `getVersion` + - `getSlot` + +### 0.4.1 — Helpers HTTP Solana +À faire : + +- ajouter des helpers HTTP haut niveau comme pour le client WS, +- distinguer helpers raw et helpers typed quand cela est pertinent, +- couvrir les premières méthodes utiles du RPC HTTP Solana, +- conserver `HttpClient` comme couche générique réutilisable. + +### 0.4.2 — Politique HTTP avancée +À faire : + +- préparer un état de pause avant envoi pour un endpoint HTTP, +- préparer plusieurs quotas par famille de méthodes, +- distinguer quota RPC général et quota `sendTransaction`, +- préparer un futur pool d’endpoints HTTP et l’arbitrage entre eux. + +### 0.4.3 — Démo HTTP dans `kb_app` +À faire : + +- ajouter une fenêtre `Demo Http`, +- suivre la logique de `Demo Ws`, +- permettre de tester les endpoints HTTP configurés, +- afficher les réponses JSON-RPC HTTP et les erreurs associées. ### 6.12. Version `0.5.x` — Base de données SQLite @@ -449,10 +483,11 @@ Le projet doit maintenir au minimum : ## 12. Priorité immédiate -La priorité immédiate est la suivante : +La priorité immédiate est désormais la suivante : -1. stabiliser `Demo Ws`, -2. corriger la lecture/exposition des endpoints activés depuis la config, -3. améliorer la robustesse de l’UI sous fort débit, -4. préparer ensuite le transport HTTP générique, -5. poursuivre la structuration des connecteurs DEX. +1. démarrer la version `0.4.1` avec les helpers HTTP Solana, +2. conserver `HttpClient` comme transport HTTP générique réutilisable, +3. distinguer clairement les helpers raw et typed quand cela est pertinent, +4. préparer la future gestion avancée des quotas HTTP et des états de pause avant envoi, +5. préparer l’introduction d’un pool d’endpoints HTTP, +6. ajouter ensuite une fenêtre `Demo Http` dans `kb_app` sur le modèle de `Demo Ws`. diff --git a/kb_lib/src/config.rs b/kb_lib/src/config.rs index e01cf28..e84fffb 100644 --- a/kb_lib/src/config.rs +++ b/kb_lib/src/config.rs @@ -166,7 +166,7 @@ impl KbConfig { Ok(()) } - /// Returns a named HTTP endpoint by reference. + /// Finds one HTTP endpoint by its logical name. pub fn find_http_endpoint( &self, endpoint_name: &str, @@ -216,9 +216,9 @@ impl KbConfig { endpoint.name ))); } - if endpoint.burst == 0 { + if endpoint.burst_capacity == 0 { return Err(crate::KbError::Config(format!( - "http endpoint '{}' burst must be > 0", + "http endpoint '{}' burst_capacity must be > 0", endpoint.name ))); } @@ -384,33 +384,58 @@ pub struct KbSolanaConfig { /// HTTP endpoint configuration. #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] pub struct KbHttpEndpointConfig { - /// Stable internal endpoint name used by the application. + /// Logical endpoint name. pub name: std::string::String, - /// Enables or disables the endpoint. + /// Whether this endpoint is enabled. pub enabled: bool, - /// Provider name such as `solana-public`, `helius`, or `custom`. + /// Provider name. pub provider: std::string::String, - /// Base HTTP RPC URL. + /// Base HTTP URL. pub url: std::string::String, - /// Optional environment variable name used to resolve an API key later. + /// Optional environment variable name containing an API key. pub api_key_env_var: std::option::Option, - /// Logical roles assigned to this endpoint. + /// Allowed roles for this endpoint. pub roles: std::vec::Vec, - /// Allowed average request rate. + /// Requests per second allowed by the local limiter. pub requests_per_second: u32, - /// Burst capacity for future rate-limiting. - pub burst: u32, - /// HTTP connect timeout in milliseconds. + /// Maximum local burst capacity. + pub burst_capacity: u32, + /// Connect timeout in milliseconds. pub connect_timeout_ms: u64, - /// HTTP request timeout in milliseconds. + /// Total request timeout in milliseconds. pub request_timeout_ms: u64, + /// Maximum idle pooled connections per host. + pub max_idle_connections_per_host: usize, } impl KbHttpEndpointConfig { - /// Returns the resolved endpoint URL. + /// Returns the resolved URL, replacing an `${ENV_VAR}` placeholder when + /// `api_key_env_var` is configured. pub fn resolved_url(&self) -> Result { - kb_resolve_endpoint_url(&self.url, &self.api_key_env_var) + let env_var_name_option = self.api_key_env_var.as_ref(); + let env_var_name = match env_var_name_option { + Some(env_var_name) => env_var_name, + None => { + return Ok(self.url.clone()); + } + }; + let api_key_result = std::env::var(env_var_name); + let api_key = match api_key_result { + Ok(api_key) => api_key, + Err(error) => { + return Err(crate::KbError::Config(format!( + "cannot resolve api key env var '{}' for http endpoint '{}': {}", + env_var_name, self.name, error + ))); + } + }; + let placeholder = format!("${{{}}}", env_var_name); + if self.url.contains(&placeholder) { + return Ok(self.url.replace(&placeholder, &api_key)); + } + Ok(self.url.clone()) } } diff --git a/kb_lib/src/http_client.rs b/kb_lib/src/http_client.rs index 528bd13..30194cc 100644 --- a/kb_lib/src/http_client.rs +++ b/kb_lib/src/http_client.rs @@ -1,32 +1,168 @@ // file: kb_lib/src/http_client.rs -//! Generic asynchronous HTTP client skeleton. +//! Generic asynchronous HTTP JSON-RPC client. //! -//! The transport is intentionally minimal in `0.0.2`. Endpoint binding and -//! client construction are stabilized now, while JSON-RPC request execution, -//! throttling, and batching are scheduled for `0.4.x`. +//! This module provides a reusable `HttpClient` built on top of `reqwest` for +//! Solana RPC HTTP endpoints. +//! +//! Version `0.4.0` keeps the API intentionally small: +//! - reusable async client +//! - endpoint URL resolution via config +//! - local req/sec + burst limiter +//! - generic JSON-RPC 2.0 request/response handling +//! - a few initial Solana validation helpers -/// Generic asynchronous HTTP client placeholder. +/// JSON-RPC 2.0 request envelope for HTTP. +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct KbJsonRpcHttpRequest { + /// JSON-RPC version, expected to be `"2.0"`. + pub jsonrpc: std::string::String, + /// Client request identifier. + pub id: serde_json::Value, + /// RPC method name. + pub method: std::string::String, + /// Ordered method parameters. + pub params: std::vec::Vec, +} + +impl KbJsonRpcHttpRequest { + /// Creates a new request with a numeric identifier. + pub fn new_with_u64_id( + id: u64, + method: std::string::String, + params: std::vec::Vec, + ) -> Self { + Self { + jsonrpc: "2.0".to_string(), + id: serde_json::Value::from(id), + method, + params, + } + } + + /// Serializes the request into a compact JSON string. + pub fn to_json_string(&self) -> Result { + let text_result = serde_json::to_string(self); + match text_result { + Ok(text) => Ok(text), + Err(error) => Err(crate::KbError::Json(format!( + "cannot serialize http json-rpc request '{}': {error}", + self.method + ))), + } + } +} + +/// JSON-RPC 2.0 success response. +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct KbJsonRpcHttpSuccessResponse { + /// JSON-RPC version, expected to be `"2.0"`. + pub jsonrpc: std::string::String, + /// Result payload. + pub result: serde_json::Value, + /// Request identifier echoed by the server. + pub id: serde_json::Value, +} + +/// JSON-RPC 2.0 error object. +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct KbJsonRpcHttpErrorObject { + /// Numeric JSON-RPC error code. + pub code: i64, + /// Human-readable error message. + pub message: std::string::String, + /// Optional server-provided payload. + pub data: std::option::Option, +} + +/// JSON-RPC 2.0 error response. +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct KbJsonRpcHttpErrorResponse { + /// JSON-RPC version, expected to be `"2.0"`. + pub jsonrpc: std::string::String, + /// Error payload. + pub error: KbJsonRpcHttpErrorObject, + /// Request identifier echoed by the server. + pub id: serde_json::Value, +} + +/// Parsed HTTP JSON-RPC response envelope. +#[derive(Clone, Debug, PartialEq)] +pub enum KbJsonRpcHttpResponse { + /// Success response. + Success(KbJsonRpcHttpSuccessResponse), + /// Error response. + Error(KbJsonRpcHttpErrorResponse), +} + +#[derive(Debug)] +struct KbHttpTokenBucket { + tokens: f64, + last_refill_at: std::time::Instant, +} + +impl KbHttpTokenBucket { + fn new(burst_capacity: u32) -> Self { + Self { + tokens: burst_capacity as f64, + last_refill_at: std::time::Instant::now(), + } + } +} + +/// Generic asynchronous HTTP client. #[derive(Clone, Debug)] pub struct HttpClient { endpoint: crate::KbHttpEndpointConfig, + resolved_url: std::string::String, client: reqwest::Client, + next_request_id: std::sync::Arc, + limiter: std::sync::Arc>, } impl HttpClient { - /// Creates a new HTTP client bound to a named endpoint configuration. + /// Creates a new HTTP client bound to one endpoint configuration. pub fn new(endpoint: crate::KbHttpEndpointConfig) -> Result { if endpoint.name.trim().is_empty() { return Err(crate::KbError::Config( "http client endpoint name must not be empty".to_string(), )); } + if endpoint.requests_per_second == 0 { + return Err(crate::KbError::Config(format!( + "http endpoint '{}' must have requests_per_second > 0", + endpoint.name + ))); + } + if endpoint.burst_capacity == 0 { + return Err(crate::KbError::Config(format!( + "http endpoint '{}' must have burst_capacity > 0", + endpoint.name + ))); + } + if endpoint.max_idle_connections_per_host == 0 { + return Err(crate::KbError::Config(format!( + "http endpoint '{}' must have max_idle_connections_per_host > 0", + endpoint.name + ))); + } + let resolved_url_result = endpoint.resolved_url(); + let resolved_url = match resolved_url_result { + Ok(resolved_url) => resolved_url, + Err(error) => return Err(error), + }; let builder = reqwest::Client::builder() .connect_timeout(std::time::Duration::from_millis( endpoint.connect_timeout_ms, )) .timeout(std::time::Duration::from_millis( endpoint.request_timeout_ms, + )) + .pool_max_idle_per_host(endpoint.max_idle_connections_per_host) + .user_agent(format!( + "{}/{}", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_VERSION") )); let client_result = builder.build(); let client = match client_result { @@ -38,36 +174,559 @@ impl HttpClient { ))); } }; - Ok(Self { endpoint, client }) + Ok(Self { + limiter: std::sync::Arc::new(tokio::sync::Mutex::new(KbHttpTokenBucket::new( + endpoint.burst_capacity, + ))), + endpoint, + resolved_url, + client, + next_request_id: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)), + }) } - /// Returns the endpoint name of this client. + /// Returns the endpoint name. pub fn endpoint_name(&self) -> &str { &self.endpoint.name } - /// Returns the endpoint URL of this client. + /// Returns the resolved endpoint URL. pub fn endpoint_url(&self) -> &str { - &self.endpoint.url + &self.resolved_url } - /// Returns the endpoint configuration of this client. + /// Returns the endpoint configuration. pub fn endpoint_config(&self) -> &crate::KbHttpEndpointConfig { &self.endpoint } - /// Returns the underlying reqwest client reference. - pub fn raw_client(&self) -> &reqwest::Client { - &self.client + /// Returns the next request identifier and increments the internal counter. + pub fn next_request_id(&self) -> u64 { + self.next_request_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) } - /// Sends a JSON-RPC payload. - pub async fn send_json_rpc_request( + /// Executes one JSON-RPC request and returns the success envelope. + pub async fn execute_json_rpc_request_raw( &self, - _payload: &serde_json::Value, + method: std::string::String, + params: std::vec::Vec, + ) -> Result { + let request_id = self.next_request_id(); + let request = KbJsonRpcHttpRequest::new_with_u64_id(request_id, method, params); + self.execute_json_rpc_request_object(&request).await + } + + /// Executes one prebuilt JSON-RPC request object. + pub async fn execute_json_rpc_request_object( + &self, + request: &KbJsonRpcHttpRequest, + ) -> Result { + let rate_limit_result = self.acquire_rate_limit_slot().await; + if let Err(error) = rate_limit_result { + return Err(error); + } + let body_result = request.to_json_string(); + let body = match body_result { + Ok(body) => body, + Err(error) => return Err(error), + }; + tracing::debug!( + endpoint_name = %self.endpoint.name, + endpoint_url = %self.resolved_url, + method = %request.method, + request_id = %request.id, + "sending http json-rpc request" + ); + let send_result = self + .client + .post(self.resolved_url.clone()) + .header("content-type", "application/json") + .body(body) + .send() + .await; + let response = match send_result { + Ok(response) => response, + Err(error) => { + return Err(crate::KbError::Http(format!( + "http request failed for endpoint '{}' method '{}': {error}", + self.endpoint.name, request.method + ))); + } + }; + let status = response.status(); + let text_result = response.text().await; + let text = match text_result { + Ok(text) => text, + Err(error) => { + return Err(crate::KbError::Http(format!( + "cannot read http response body for endpoint '{}' method '{}': {error}", + self.endpoint.name, request.method + ))); + } + }; + if !status.is_success() { + return Err(crate::KbError::Http(format!( + "http status {} returned by endpoint '{}' method '{}' body='{}'", + status, + self.endpoint.name, + request.method, + kb_http_shorten_text(&text, 512) + ))); + } + let parse_result = parse_kb_json_rpc_http_response_text(&text); + let parsed_response = match parse_result { + Ok(parsed_response) => parsed_response, + Err(error) => return Err(error), + }; + match parsed_response { + KbJsonRpcHttpResponse::Success(success_response) => Ok(success_response), + KbJsonRpcHttpResponse::Error(error_response) => Err(crate::KbError::Http(format!( + "json-rpc http error on endpoint '{}' method '{}': code={} message={}", + self.endpoint.name, + request.method, + error_response.error.code, + error_response.error.message + ))), + } + } + + /// Executes one JSON-RPC request and decodes `result` into `T`. + pub async fn execute_json_rpc_request_typed( + &self, + method: std::string::String, + params: std::vec::Vec, + ) -> Result + where + T: serde::de::DeserializeOwned, + { + let raw_result = self.execute_json_rpc_request_raw(method, params).await; + let raw_response = match raw_result { + Ok(raw_response) => raw_response, + Err(error) => return Err(error), + }; + let typed_result = serde_json::from_value::(raw_response.result); + match typed_result { + Ok(value) => Ok(value), + Err(error) => Err(crate::KbError::Json(format!( + "cannot decode typed http json-rpc result: {error}" + ))), + } + } + + /// Calls `getHealth`. + pub async fn get_health(&self) -> Result { + let raw_result = self + .execute_json_rpc_request_raw("getHealth".to_string(), std::vec::Vec::new()) + .await; + match raw_result { + Ok(response) => Ok(response.result), + Err(error) => Err(error), + } + } + + /// Calls `getVersion`. + pub async fn get_version(&self) -> Result { + let raw_result = self + .execute_json_rpc_request_raw("getVersion".to_string(), std::vec::Vec::new()) + .await; + match raw_result { + Ok(response) => Ok(response.result), + Err(error) => Err(error), + } + } + + /// Calls `getSlot`. + pub async fn get_slot( + &self, + commitment: std::option::Option, ) -> Result { - Err(crate::KbError::NotImplemented( - "HttpClient::send_json_rpc_request is scheduled for version 0.4.x".to_string(), - )) + let mut params = std::vec::Vec::new(); + if let Some(commitment) = commitment { + params.push(serde_json::json!({ + "commitment": commitment + })); + } + let raw_result = self + .execute_json_rpc_request_raw("getSlot".to_string(), params) + .await; + match raw_result { + Ok(response) => Ok(response.result), + Err(error) => Err(error), + } + } + + async fn acquire_rate_limit_slot(&self) -> Result<(), crate::KbError> { + loop { + let wait_duration_option = { + let mut limiter_guard = self.limiter.lock().await; + let now = std::time::Instant::now(); + let elapsed_seconds = now + .duration_since(limiter_guard.last_refill_at) + .as_secs_f64(); + let replenished_tokens = limiter_guard.tokens + + elapsed_seconds * self.endpoint.requests_per_second as f64; + let burst_capacity = self.endpoint.burst_capacity as f64; + limiter_guard.tokens = replenished_tokens.min(burst_capacity); + limiter_guard.last_refill_at = now; + if limiter_guard.tokens >= 1.0 { + limiter_guard.tokens -= 1.0; + None + } else { + let missing_tokens = 1.0 - limiter_guard.tokens; + let wait_seconds = missing_tokens / self.endpoint.requests_per_second as f64; + Some(std::time::Duration::from_secs_f64(wait_seconds.max(0.001))) + } + }; + match wait_duration_option { + Some(wait_duration) => { + tokio::time::sleep(wait_duration).await; + } + None => { + break; + } + } + } + Ok(()) + } +} + +/// Parses one JSON-RPC HTTP response text. +pub fn parse_kb_json_rpc_http_response_text( + text: &str, +) -> Result { + let value_result = serde_json::from_str::(text); + let value = match value_result { + Ok(value) => value, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot parse http json-rpc text: {error}" + ))); + } + }; + parse_kb_json_rpc_http_response_value(&value) +} + +/// Parses one JSON-RPC HTTP response value. +pub fn parse_kb_json_rpc_http_response_value( + value: &serde_json::Value, +) -> Result { + let object_option = value.as_object(); + let object = match object_option { + Some(object) => object, + None => { + return Err(crate::KbError::Json( + "http json-rpc payload must be a JSON object".to_string(), + )); + } + }; + + let jsonrpc_value_option = object.get("jsonrpc"); + let jsonrpc_value = match jsonrpc_value_option { + Some(jsonrpc_value) => jsonrpc_value, + None => { + return Err(crate::KbError::Json( + "http json-rpc payload is missing 'jsonrpc'".to_string(), + )); + } + }; + + let jsonrpc_string_option = jsonrpc_value.as_str(); + let jsonrpc_string = match jsonrpc_string_option { + Some(jsonrpc_string) => jsonrpc_string, + None => { + return Err(crate::KbError::Json( + "http json-rpc field 'jsonrpc' must be a string".to_string(), + )); + } + }; + + if jsonrpc_string != "2.0" { + return Err(crate::KbError::Json(format!( + "unsupported http json-rpc version '{}'", + jsonrpc_string + ))); + } + + let has_result = object.contains_key("result"); + let has_error = object.contains_key("error"); + let has_id = object.contains_key("id"); + + if has_id && has_result && !has_error { + let response_result = serde_json::from_value::(value.clone()); + return match response_result { + Ok(response) => Ok(KbJsonRpcHttpResponse::Success(response)), + Err(error) => Err(crate::KbError::Json(format!( + "cannot parse http json-rpc success response: {error}" + ))), + }; + } + + if has_id && has_error && !has_result { + let response_result = serde_json::from_value::(value.clone()); + return match response_result { + Ok(response) => Ok(KbJsonRpcHttpResponse::Error(response)), + Err(error) => Err(crate::KbError::Json(format!( + "cannot parse http json-rpc error response: {error}" + ))), + }; + } + Err(crate::KbError::Json( + "unsupported http json-rpc response shape".to_string(), + )) +} + +fn kb_http_shorten_text(input: &str, max_chars: usize) -> std::string::String { + let char_count = input.chars().count(); + if char_count <= max_chars { + return input.to_string(); + } + let shortened: std::string::String = input.chars().take(max_chars).collect(); + format!("{shortened} …[truncated {} chars]", char_count - max_chars) +} + +#[cfg(test)] +mod tests { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + #[derive(Debug)] + struct TestHttpServer { + url: std::string::String, + shutdown_tx: std::option::Option>, + observed_methods: std::sync::Arc>>, + } + + impl TestHttpServer { + async fn spawn() -> Self { + let observed_methods = + std::sync::Arc::new(tokio::sync::Mutex::new(std::vec::Vec::new())); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("listener bind must succeed"); + let local_addr = listener.local_addr().expect("local addr must exist"); + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let observed_methods_for_server = observed_methods.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut shutdown_rx => { + break; + }, + accept_result = listener.accept() => { + let (mut stream, _peer_addr) = accept_result.expect("accept must succeed"); + let observed_methods_for_connection = observed_methods_for_server.clone(); + tokio::spawn(async move { + let mut buffer = vec![0u8; 8192]; + let read_result = stream.read(&mut buffer).await; + let bytes_read = read_result.expect("read must succeed"); + let request_text = + std::string::String::from_utf8_lossy(&buffer[..bytes_read]).to_string(); + let split_result: std::vec::Vec<&str> = + request_text.split("\r\n\r\n").collect(); + let body = if split_result.len() >= 2 { + split_result[1].to_string() + } else { + std::string::String::new() + }; + let request_json: serde_json::Value = + serde_json::from_str(&body).expect("request body must be valid json"); + let method = request_json["method"] + .as_str() + .expect("method must be a string") + .to_string(); + { + let mut observed_methods_guard = + observed_methods_for_connection.lock().await; + observed_methods_guard.push(method.clone()); + } + let id = request_json["id"].clone(); + let response_body = if method == "getHealth" { + serde_json::json!({ + "jsonrpc": "2.0", + "result": "ok", + "id": id + }).to_string() + } else if method == "getVersion" { + serde_json::json!({ + "jsonrpc": "2.0", + "result": { + "solana-core": "2.2.3", + "feature-set": 123 + }, + "id": id + }).to_string() + } else if method == "getSlot" { + serde_json::json!({ + "jsonrpc": "2.0", + "result": 424242u64, + "id": id + }).to_string() + } else { + serde_json::json!({ + "jsonrpc": "2.0", + "error": { + "code": -32601, + "message": "Method not found" + }, + "id": id + }).to_string() + }; + let response_text = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + response_body.len(), + response_body + ); + let _ = stream.write_all(response_text.as_bytes()).await; + let _ = stream.shutdown().await; + }); + } + } + } + }); + Self { + url: format!("http://{}", local_addr), + shutdown_tx: Some(shutdown_tx), + observed_methods, + } + } + + async fn observed_methods_snapshot(&self) -> std::vec::Vec { + let observed_methods_guard = self.observed_methods.lock().await; + observed_methods_guard.clone() + } + + async fn shutdown(mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } + } + } + + fn make_http_endpoint(url: std::string::String) -> crate::KbHttpEndpointConfig { + crate::KbHttpEndpointConfig { + name: "test_http".to_string(), + enabled: true, + provider: "test".to_string(), + url, + api_key_env_var: None, + roles: vec!["http_queries".to_string()], + requests_per_second: 20, + burst_capacity: 5, + connect_timeout_ms: 2000, + request_timeout_ms: 2000, + max_idle_connections_per_host: 4, + } + } + + #[test] + fn parse_http_success_response_works() { + let parsed = crate::parse_kb_json_rpc_http_response_text( + r#"{"jsonrpc":"2.0","result":"ok","id":1}"#, + ) + .expect("parse must succeed"); + match parsed { + crate::KbJsonRpcHttpResponse::Success(response) => { + assert_eq!(response.result, serde_json::Value::String("ok".to_string())); + assert_eq!(response.id, serde_json::Value::from(1u64)); + } + other => { + panic!("unexpected response: {other:?}"); + } + } + } + + #[test] + fn parse_http_error_response_works() { + let parsed = crate::parse_kb_json_rpc_http_response_text( + r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":1}"#, + ) + .expect("parse must succeed"); + match parsed { + crate::KbJsonRpcHttpResponse::Error(response) => { + assert_eq!(response.error.code, -32601); + assert_eq!(response.error.message, "Method not found"); + } + other => { + panic!("unexpected response: {other:?}"); + } + } + } + + #[tokio::test] + async fn next_request_id_is_shared_between_clones() { + let endpoint = make_http_endpoint("http://127.0.0.1:65535".to_string()); + let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); + let cloned = client.clone(); + assert_eq!(client.next_request_id(), 1); + assert_eq!(cloned.next_request_id(), 2); + assert_eq!(client.next_request_id(), 3); + } + + #[tokio::test] + async fn get_health_works() { + let server = TestHttpServer::spawn().await; + let endpoint = make_http_endpoint(server.url.clone()); + let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); + let result = client.get_health().await.expect("get_health must succeed"); + assert_eq!(result, serde_json::Value::String("ok".to_string())); + let observed_methods = server.observed_methods_snapshot().await; + assert!(observed_methods.iter().any(|method| method == "getHealth")); + server.shutdown().await; + } + + #[tokio::test] + async fn get_version_works() { + let server = TestHttpServer::spawn().await; + let endpoint = make_http_endpoint(server.url.clone()); + let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); + let result = client + .get_version() + .await + .expect("get_version must succeed"); + assert_eq!( + result["solana-core"], + serde_json::Value::String("2.2.3".to_string()) + ); + assert_eq!(result["feature-set"], serde_json::Value::from(123u64)); + let observed_methods = server.observed_methods_snapshot().await; + assert!(observed_methods.iter().any(|method| method == "getVersion")); + server.shutdown().await; + } + + #[tokio::test] + async fn get_slot_works() { + let server = TestHttpServer::spawn().await; + let endpoint = make_http_endpoint(server.url.clone()); + let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); + let result = client + .get_slot(Some("finalized".to_string())) + .await + .expect("get_slot must succeed"); + assert_eq!(result, serde_json::Value::from(424242u64)); + let observed_methods = server.observed_methods_snapshot().await; + assert!(observed_methods.iter().any(|method| method == "getSlot")); + server.shutdown().await; + } + + #[tokio::test] + async fn unknown_method_returns_error() { + let server = TestHttpServer::spawn().await; + let endpoint = make_http_endpoint(server.url.clone()); + let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); + let result = client + .execute_json_rpc_request_raw("unknownMethod".to_string(), std::vec::Vec::new()) + .await; + assert!(result.is_err()); + let error = result.expect_err("unknown method must fail"); + match error { + crate::KbError::Http(message) => { + assert!(message.contains("Method not found")); + } + other => { + panic!("unexpected error: {other:?}"); + } + } + server.shutdown().await; } } diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index acb01da..40ee633 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -38,6 +38,13 @@ pub use crate::rpc_ws::kb_is_probable_json_rpc_object_text; pub use crate::rpc_ws::parse_kb_json_rpc_ws_incoming_text; pub use crate::rpc_ws::parse_kb_json_rpc_ws_incoming_value; pub use crate::http_client::HttpClient; +pub use crate::http_client::KbJsonRpcHttpErrorObject; +pub use crate::http_client::KbJsonRpcHttpErrorResponse; +pub use crate::http_client::KbJsonRpcHttpRequest; +pub use crate::http_client::KbJsonRpcHttpResponse; +pub use crate::http_client::KbJsonRpcHttpSuccessResponse; +pub use crate::http_client::parse_kb_json_rpc_http_response_text; +pub use crate::http_client::parse_kb_json_rpc_http_response_value; pub use crate::tracing::KbTracingGuard; pub use crate::tracing::init_tracing; pub use crate::types::KbConnectionState;