// file: kb_lib/src/http_client.rs //! Generic asynchronous HTTP JSON-RPC client. //! //! This module provides a reusable `HttpClient` built on top of `reqwest` for //! Solana RPC HTTP endpoints. //! //! Version `0.4.2` extends the `0.4.1` transport layer with: //! - local endpoint status management (`Active`, `Paused`, `Disabled`) //! - method classification (`GeneralRpc`, `SendTransaction`, `HeavyRead`) //! - per-class local rate limiting //! - automatic pause after HTTP 429 responses /// JSON-RPC 2.0 request envelope for HTTP. #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] pub struct JsonRpcHttpRequest { /// JSON-RPC version, expected to be `"2.0"`. pub jsonrpc: std::string::String, /// Client request identifier. pub id: serde_json::Value, /// RPC method name. pub method: std::string::String, /// Ordered method parameters. pub params: std::vec::Vec, } impl JsonRpcHttpRequest { /// Creates a new request with a numeric identifier. pub fn new_with_u64_id( id: u64, method: std::string::String, params: std::vec::Vec, ) -> Self { return Self { jsonrpc: "2.0".to_string(), id: serde_json::Value::from(id), method, params, }; } /// Serializes the request into a compact JSON string. pub fn to_json_string(&self) -> Result { let text_result = serde_json::to_string(self); match text_result { Ok(text) => return Ok(text), Err(error) => { return Err(crate::Error::Json(format!( "cannot serialize http json-rpc request '{}': {error}", self.method ))); }, } } } /// JSON-RPC 2.0 success response. #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] pub struct JsonRpcHttpSuccessResponse { /// JSON-RPC version, expected to be `"2.0"`. pub jsonrpc: std::string::String, /// Result payload. pub result: serde_json::Value, /// Request identifier echoed by the server. pub id: serde_json::Value, } /// JSON-RPC 2.0 error object. #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] pub struct JsonRpcHttpErrorObject { /// Numeric JSON-RPC error code. pub code: i64, /// Human-readable error message. pub message: std::string::String, /// Optional server-provided payload. pub data: std::option::Option, } /// JSON-RPC 2.0 error response. #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] pub struct JsonRpcHttpErrorResponse { /// JSON-RPC version, expected to be `"2.0"`. pub jsonrpc: std::string::String, /// Error payload. pub error: JsonRpcHttpErrorObject, /// Request identifier echoed by the server. pub id: serde_json::Value, } /// Parsed HTTP JSON-RPC response envelope. #[derive(Clone, Debug, PartialEq)] pub enum JsonRpcHttpResponse { /// Success response. Success(JsonRpcHttpSuccessResponse), /// Error response. Error(JsonRpcHttpErrorResponse), } /// Local HTTP method class used for independent limit buckets. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum HttpMethodClass { /// Standard RPC reads and generic methods. GeneralRpc, /// Transaction submission methods. SendTransaction, /// Resource-intensive read methods. HeavyRead, } /// Local endpoint status. #[derive(Clone, Debug, PartialEq, Eq)] pub enum HttpEndpointStatus { /// Endpoint is ready to accept requests. Active, /// Endpoint is temporarily paused. Paused { /// Remaining pause duration in milliseconds. remaining_ms: u64, }, /// Endpoint is manually disabled. Disabled, } #[derive(Debug)] struct HttpTokenBucket { tokens: f64, last_refill_at: std::time::Instant, } impl HttpTokenBucket { fn new(burst_capacity: u32) -> HttpTokenBucket { return HttpTokenBucket { tokens: burst_capacity as f64, last_refill_at: std::time::Instant::now(), }; } } #[derive(Clone, Debug)] enum HttpEndpointLifecycleState { Active, PausedUntil(std::time::Instant), Disabled, } #[derive(Debug)] struct HttpRuntimeState { lifecycle: HttpEndpointLifecycleState, general_bucket: HttpTokenBucket, send_transaction_bucket: HttpTokenBucket, heavy_read_bucket: HttpTokenBucket, } impl HttpRuntimeState { fn new(endpoint: &crate::HttpEndpointConfig) -> HttpRuntimeState { return HttpRuntimeState { lifecycle: HttpEndpointLifecycleState::Active, general_bucket: HttpTokenBucket::new(endpoint.burst_capacity), send_transaction_bucket: HttpTokenBucket::new(http_effective_burst_capacity( endpoint, HttpMethodClass::SendTransaction, )), heavy_read_bucket: HttpTokenBucket::new(http_effective_burst_capacity( endpoint, HttpMethodClass::HeavyRead, )), }; } } /// Generic asynchronous HTTP client. #[derive(Clone, Debug)] pub struct HttpClient { endpoint: crate::HttpEndpointConfig, resolved_url: std::string::String, client: reqwest::Client, next_request_id: std::sync::Arc, runtime: std::sync::Arc>, concurrency_limiter: std::sync::Arc, } impl HttpClient { /// Creates a new HTTP client bound to one endpoint configuration. pub fn new(endpoint: crate::HttpEndpointConfig) -> Result { if endpoint.name.trim().is_empty() { return Err(crate::Error::Config( "http client endpoint name must not be empty".to_string(), )); } if endpoint.requests_per_second == 0 { return Err(crate::Error::Config(format!( "http endpoint '{}' must have requests_per_second > 0", endpoint.name ))); } if endpoint.burst_capacity == 0 { return Err(crate::Error::Config(format!( "http endpoint '{}' must have burst_capacity > 0", endpoint.name ))); } if let Some(send_transaction_requests_per_second) = endpoint.send_transaction_requests_per_second { if send_transaction_requests_per_second == 0 { return Err(crate::Error::Config(format!( "http endpoint '{}' must have send_transaction_requests_per_second > 0 when configured", endpoint.name ))); } } if let Some(send_transaction_burst_capacity) = endpoint.send_transaction_burst_capacity { if send_transaction_burst_capacity == 0 { return Err(crate::Error::Config(format!( "http endpoint '{}' must have send_transaction_burst_capacity > 0 when configured", endpoint.name ))); } } if let Some(heavy_requests_per_second) = endpoint.heavy_requests_per_second { if heavy_requests_per_second == 0 { return Err(crate::Error::Config(format!( "http endpoint '{}' must have heavy_requests_per_second > 0 when configured", endpoint.name ))); } } if let Some(heavy_burst_capacity) = endpoint.heavy_burst_capacity { if heavy_burst_capacity == 0 { return Err(crate::Error::Config(format!( "http endpoint '{}' must have heavy_burst_capacity > 0 when configured", endpoint.name ))); } } if endpoint.max_idle_connections_per_host == 0 { return Err(crate::Error::Config(format!( "http endpoint '{}' must have max_idle_connections_per_host > 0", endpoint.name ))); } if endpoint.max_concurrent_requests_per_endpoint == 0 { return Err(crate::Error::Config(format!( "http endpoint '{}' must have max_concurrent_requests_per_endpoint > 0", endpoint.name ))); } let resolved_url_result = endpoint.resolved_url(); let resolved_url = match resolved_url_result { Ok(resolved_url) => resolved_url, Err(error) => return Err(error), }; let builder = reqwest::Client::builder() .connect_timeout(std::time::Duration::from_millis(endpoint.connect_timeout_ms)) .timeout(std::time::Duration::from_millis(endpoint.request_timeout_ms)) .pool_max_idle_per_host(endpoint.max_idle_connections_per_host) .user_agent(format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))); let client_result = builder.build(); let client = match client_result { Ok(client) => client, Err(error) => { return Err(crate::Error::Http(format!( "cannot build reqwest client for endpoint '{}': {error}", endpoint.name ))); }, }; return Ok(Self { endpoint: endpoint.clone(), resolved_url, client, next_request_id: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)), runtime: std::sync::Arc::new(tokio::sync::Mutex::new(HttpRuntimeState::new(&endpoint))), concurrency_limiter: std::sync::Arc::new(tokio::sync::Semaphore::new( endpoint.max_concurrent_requests_per_endpoint, )), }); } /// Returns the endpoint name. pub fn endpoint_name(&self) -> &str { return &self.endpoint.name; } /// Returns the resolved endpoint URL. pub fn endpoint_url(&self) -> &str { return &self.resolved_url; } /// Returns the endpoint configuration. pub fn endpoint_config(&self) -> &crate::HttpEndpointConfig { return &self.endpoint; } /// Returns whether this endpoint supports the requested logical role. pub fn supports_role(&self, required_role: &str) -> bool { if required_role.trim().is_empty() { return true; } return self.endpoint.roles.iter().any(|role| return role == required_role); } /// Returns the currently available concurrency slots for this endpoint. pub fn available_concurrency_slots(&self) -> usize { return self.concurrency_limiter.available_permits(); } /// Returns the next request identifier and increments the internal counter. pub fn next_request_id(&self) -> u64 { return self.next_request_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } /// Returns the current local endpoint status. pub async fn endpoint_status(&self) -> HttpEndpointStatus { let mut runtime_guard = self.runtime.lock().await; http_normalize_runtime_lifecycle(&mut runtime_guard); match runtime_guard.lifecycle.clone() { HttpEndpointLifecycleState::Active => return HttpEndpointStatus::Active, HttpEndpointLifecycleState::Disabled => return HttpEndpointStatus::Disabled, HttpEndpointLifecycleState::PausedUntil(deadline) => { let now = std::time::Instant::now(); if deadline <= now { runtime_guard.lifecycle = HttpEndpointLifecycleState::Active; return HttpEndpointStatus::Active; } else { let remaining = deadline.duration_since(now); let remaining_ms_u128 = remaining.as_millis(); let remaining_ms = if remaining_ms_u128 > u128::from(u64::MAX) { u64::MAX } else { remaining_ms_u128 as u64 }; return HttpEndpointStatus::Paused { remaining_ms }; } }, } } /// Pauses this endpoint locally before future sends. pub async fn pause_for(&self, duration_ms: u64) { let pause_duration = std::time::Duration::from_millis(duration_ms); let pause_deadline = std::time::Instant::now() + pause_duration; let mut runtime_guard = self.runtime.lock().await; runtime_guard.lifecycle = HttpEndpointLifecycleState::PausedUntil(pause_deadline); } /// Resumes this endpoint if it is paused. pub async fn resume(&self) { let mut runtime_guard = self.runtime.lock().await; if matches!(runtime_guard.lifecycle, HttpEndpointLifecycleState::PausedUntil(_)) { runtime_guard.lifecycle = HttpEndpointLifecycleState::Active; } } /// Disables this endpoint locally. pub async fn disable(&self) { let mut runtime_guard = self.runtime.lock().await; runtime_guard.lifecycle = HttpEndpointLifecycleState::Disabled; } /// Re-enables this endpoint locally. pub async fn enable(&self) { let mut runtime_guard = self.runtime.lock().await; runtime_guard.lifecycle = HttpEndpointLifecycleState::Active; } /// Classifies one HTTP JSON-RPC method. pub fn classify_method(method: &str) -> HttpMethodClass { return http_classify_method(method); } /// Executes one JSON-RPC request and returns the success envelope. pub async fn execute_json_rpc_request_raw( &self, method: std::string::String, params: std::vec::Vec, ) -> Result { let request_id = self.next_request_id(); let request = JsonRpcHttpRequest::new_with_u64_id(request_id, method, params); return self.execute_json_rpc_request_object(&request).await; } /// Executes one prebuilt JSON-RPC request object. pub async fn execute_json_rpc_request_object( &self, request: &JsonRpcHttpRequest, ) -> Result { let method_class = http_classify_method(&request.method); let rate_limit_result = self.acquire_rate_limit_slot_for_method_class(method_class).await; if let Err(error) = rate_limit_result { return Err(error); } let body_result = request.to_json_string(); let body = match body_result { Ok(body) => body, Err(error) => return Err(error), }; let concurrency_permit_result = self.concurrency_limiter.clone().acquire_owned().await; let _concurrency_permit = match concurrency_permit_result { Ok(concurrency_permit) => concurrency_permit, Err(error) => { return Err(crate::Error::Http(format!( "cannot acquire concurrency slot for endpoint '{}' method '{}': {}", self.endpoint.name, request.method, error ))); }, }; tracing::trace!( endpoint_name = %self.endpoint.name, endpoint_url = %self.resolved_url, method = %request.method, method_class = ?method_class, request_id = %request.id, "sending http json-rpc request" ); let send_result = self .client .post(self.resolved_url.clone()) .header("content-type", "application/json") .body(body) .send() .await; let response = match send_result { Ok(response) => response, Err(error) => { return Err(crate::Error::Http(format!( "http request failed for endpoint '{}' method '{}': {error}", self.endpoint.name, request.method ))); }, }; let status = response.status(); let retry_after_header = response.headers().get(reqwest::header::RETRY_AFTER).cloned(); let text_result = response.text().await; let text = match text_result { Ok(text) => text, Err(error) => { return Err(crate::Error::Http(format!( "cannot read http response body for endpoint '{}' method '{}': {error}", self.endpoint.name, request.method ))); }, }; if status.as_u16() == 429 { let pause_duration_ms = match retry_after_header.as_ref().and_then(http_retry_after_to_pause_ms) { Some(retry_after_ms) => retry_after_ms, None => self.endpoint.pause_after_http_429_ms.unwrap_or(1500), }; self.pause_for(pause_duration_ms).await; return Err(crate::Error::Http(format!( "http status 429 returned by endpoint '{}' method '{}'; endpoint paused for {} ms; body='{}'", self.endpoint.name, request.method, pause_duration_ms, http_shorten_text(&text, 512) ))); } if !status.is_success() { return Err(crate::Error::Http(format!( "http status {} returned by endpoint '{}' method '{}' body='{}'", status, self.endpoint.name, request.method, http_shorten_text(&text, 512) ))); } let parse_result = parse_json_rpc_http_response_text(&text); let parsed_response = match parse_result { Ok(parsed_response) => parsed_response, Err(error) => return Err(error), }; match parsed_response { JsonRpcHttpResponse::Success(success_response) => return Ok(success_response), JsonRpcHttpResponse::Error(error_response) => { return Err(crate::Error::Http(format!( "json-rpc http error on endpoint '{}' method '{}': code={} message={}", self.endpoint.name, request.method, error_response.error.code, error_response.error.message ))); }, } } /// Executes one JSON-RPC request and returns only the raw `result` field. pub async fn execute_json_rpc_result_raw( &self, method: std::string::String, params: std::vec::Vec, ) -> Result { let raw_result = self.execute_json_rpc_request_raw(method, params).await; let raw_response = match raw_result { Ok(raw_response) => raw_response, Err(error) => return Err(error), }; return Ok(raw_response.result); } /// Executes one JSON-RPC request and decodes `result` into `T`. pub async fn execute_json_rpc_request_typed( &self, method: std::string::String, params: std::vec::Vec, ) -> Result where T: serde::de::DeserializeOwned, { let raw_result = self.execute_json_rpc_result_raw(method, params).await; let raw_value = match raw_result { Ok(raw_value) => raw_value, Err(error) => return Err(error), }; let typed_result = serde_json::from_value::(raw_value); match typed_result { Ok(value) => return Ok(value), Err(error) => { return Err(crate::Error::Json(format!( "cannot decode typed http json-rpc result: {error}" ))); }, } } /// Raw helper for `getHealth`. pub async fn get_health_raw(&self) -> Result { return self .execute_json_rpc_result_raw("getHealth".to_string(), std::vec::Vec::new()) .await; } /// Typed helper for `getHealth`. pub async fn get_health(&self) -> Result { return self .execute_json_rpc_request_typed::( "getHealth".to_string(), std::vec::Vec::new(), ) .await; } /// Raw helper for `getVersion`. pub async fn get_version_raw(&self) -> Result { return self .execute_json_rpc_result_raw("getVersion".to_string(), std::vec::Vec::new()) .await; } /// Typed helper for `getVersion`. pub async fn get_version( &self, ) -> Result { return self .execute_json_rpc_request_typed::( "getVersion".to_string(), std::vec::Vec::new(), ) .await; } /// Raw helper for `getSlot`. pub async fn get_slot_raw( &self, config: std::option::Option, ) -> Result { let params = build_optional_config_only_params(config); return self.execute_json_rpc_result_raw("getSlot".to_string(), params).await; } /// Typed helper for `getSlot`. pub async fn get_slot( &self, config: std::option::Option, ) -> Result { let config_value_result = serialize_optional_json_value(config, "getSlot config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; let params = build_optional_config_only_params(config_value); return self.execute_json_rpc_request_typed::("getSlot".to_string(), params).await; } /// Raw helper for `getBlockHeight`. pub async fn get_block_height_raw( &self, config: std::option::Option, ) -> Result { let params = build_optional_config_only_params(config); return self.execute_json_rpc_result_raw("getBlockHeight".to_string(), params).await; } /// Typed helper for `getBlockHeight`. pub async fn get_block_height( &self, config: std::option::Option, ) -> Result { let config_value_result = serialize_optional_json_value(config, "getBlockHeight config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; let params = build_optional_config_only_params(config_value); return self .execute_json_rpc_request_typed::("getBlockHeight".to_string(), params) .await; } /// Raw helper for `getLatestBlockhash`. pub async fn get_latest_blockhash_raw( &self, config: std::option::Option, ) -> Result { let params = build_optional_config_only_params(config); return self.execute_json_rpc_result_raw("getLatestBlockhash".to_string(), params).await; } /// Typed helper for `getLatestBlockhash`. pub async fn get_latest_blockhash( &self, config: std::option::Option, ) -> Result< solana_rpc_client_api::response::Response, crate::Error, > { let config_value_result = serialize_optional_json_value(config, "getLatestBlockhash config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; let params = build_optional_config_only_params(config_value); return self.execute_json_rpc_request_typed::< solana_rpc_client_api::response::Response< solana_rpc_client_api::response::RpcBlockhash, >, >("getLatestBlockhash".to_string(), params) .await; } /// Raw helper for `getBalance`. pub async fn get_balance_raw( &self, address: std::string::String, config: std::option::Option, ) -> Result { let params = build_first_string_optional_config_params(address, config); return self.execute_json_rpc_result_raw("getBalance".to_string(), params).await; } /// Typed helper for `getBalance`. pub async fn get_balance( &self, address: std::string::String, config: std::option::Option, ) -> Result, crate::Error> { let config_value_result = serialize_optional_json_value(config, "getBalance config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; let params = build_first_string_optional_config_params(address, config_value); return self .execute_json_rpc_request_typed::>( "getBalance".to_string(), params, ) .await; } /// Raw helper for `getAccountInfo`. pub async fn get_account_info_raw( &self, address: std::string::String, config: std::option::Option, ) -> Result { let params = build_first_string_optional_config_params(address, config); return self.execute_json_rpc_result_raw("getAccountInfo".to_string(), params).await; } /// Typed helper for `getAccountInfo`. pub async fn get_account_info( &self, address: std::string::String, config: std::option::Option, ) -> Result< solana_rpc_client_api::response::Response< std::option::Option, >, crate::Error, > { let config_value_result = serialize_optional_json_value(config, "getAccountInfo config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; let params = build_first_string_optional_config_params(address, config_value); return self .execute_json_rpc_request_typed::, >>("getAccountInfo".to_string(), params) .await; } /// Raw helper for `getProgramAccounts`. pub async fn get_program_accounts_raw( &self, program_id: std::string::String, config: std::option::Option, ) -> Result { let params = build_first_string_optional_config_params(program_id, config); return self.execute_json_rpc_result_raw("getProgramAccounts".to_string(), params).await; } /// Raw helper for `getSignaturesForAddress`. pub async fn get_signatures_for_address_raw( &self, address: std::string::String, config: std::option::Option, ) -> Result { let params = build_first_string_optional_config_params(address, config); return self .execute_json_rpc_result_raw("getSignaturesForAddress".to_string(), params) .await; } /// Typed helper for `getSignaturesForAddress`. pub async fn get_signatures_for_address( &self, address: std::string::String, config: std::option::Option, ) -> Result< std::vec::Vec, crate::Error, > { let config_value_result = serialize_optional_json_value(config, "getSignaturesForAddress config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; let params = build_first_string_optional_config_params(address, config_value); return self .execute_json_rpc_request_typed::>("getSignaturesForAddress".to_string(), params) .await; } /// Raw helper for `getTransaction`. pub async fn get_transaction_raw( &self, signature: std::string::String, config: std::option::Option, ) -> Result { let params = build_first_string_optional_config_params(signature, config); return self.execute_json_rpc_result_raw("getTransaction".to_string(), params).await; } /// Typed helper for `getTransaction`. pub async fn get_transaction( &self, signature: std::string::String, config: std::option::Option, ) -> Result< std::option::Option, crate::Error, > { let config_value_result = serialize_optional_json_value(config, "getTransaction config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; let params = build_first_string_optional_config_params(signature, config_value); return self.execute_json_rpc_request_typed::< std::option::Option, >("getTransaction".to_string(), params) .await; } /// Raw helper for `sendTransaction`. pub async fn send_transaction_raw( &self, encoded_transaction: std::string::String, config: std::option::Option, ) -> Result { let params = build_first_string_optional_config_params(encoded_transaction, config); return self.execute_json_rpc_result_raw("sendTransaction".to_string(), params).await; } /// Typed helper for `sendTransaction`. pub async fn send_transaction( &self, encoded_transaction: std::string::String, config: std::option::Option, ) -> Result { let config_value_result = serialize_optional_json_value(config, "sendTransaction config"); let config_value = match config_value_result { Ok(config_value) => config_value, Err(error) => return Err(error), }; let params = build_first_string_optional_config_params(encoded_transaction, config_value); return self .execute_json_rpc_request_typed::( "sendTransaction".to_string(), params, ) .await; } async fn acquire_rate_limit_slot_for_method_class( &self, method_class: HttpMethodClass, ) -> Result<(), crate::Error> { loop { let wait_duration_option = { let mut runtime_guard = self.runtime.lock().await; http_normalize_runtime_lifecycle(&mut runtime_guard); match runtime_guard.lifecycle.clone() { HttpEndpointLifecycleState::Disabled => { return Err(crate::Error::Http(format!( "http endpoint '{}' is disabled", self.endpoint.name ))); }, HttpEndpointLifecycleState::PausedUntil(deadline) => { let now = std::time::Instant::now(); if deadline > now { Some(deadline.duration_since(now)) } else { runtime_guard.lifecycle = HttpEndpointLifecycleState::Active; http_consume_rate_limit_token( &self.endpoint, &mut runtime_guard, method_class, ) } }, HttpEndpointLifecycleState::Active => http_consume_rate_limit_token( &self.endpoint, &mut runtime_guard, method_class, ), } }; match wait_duration_option { Some(wait_duration) => { tokio::time::sleep(wait_duration).await; }, None => { break; }, } } return Ok(()); } } /// Parses one JSON-RPC HTTP response text. pub fn parse_json_rpc_http_response_text(text: &str) -> Result { let value_result = serde_json::from_str::(text); let value = match value_result { Ok(value) => value, Err(error) => { return Err(crate::Error::Json(format!("cannot parse http json-rpc text: {error}"))); }, }; return parse_json_rpc_http_response_value(&value); } /// Parses one JSON-RPC HTTP response value. pub fn parse_json_rpc_http_response_value( value: &serde_json::Value, ) -> Result { let object_option = value.as_object(); let object = match object_option { Some(object) => object, None => { return Err(crate::Error::Json( "http json-rpc payload must be a JSON object".to_string(), )); }, }; let jsonrpc_value_option = object.get("jsonrpc"); let jsonrpc_value = match jsonrpc_value_option { Some(jsonrpc_value) => jsonrpc_value, None => { return Err(crate::Error::Json( "http json-rpc payload is missing 'jsonrpc'".to_string(), )); }, }; let jsonrpc_string_option = jsonrpc_value.as_str(); let jsonrpc_string = match jsonrpc_string_option { Some(jsonrpc_string) => jsonrpc_string, None => { return Err(crate::Error::Json( "http json-rpc field 'jsonrpc' must be a string".to_string(), )); }, }; if jsonrpc_string != "2.0" { return Err(crate::Error::Json(format!( "unsupported http json-rpc version '{}'", jsonrpc_string ))); } let has_result = object.contains_key("result"); let has_error = object.contains_key("error"); let has_id = object.contains_key("id"); if has_id && has_result && !has_error { let response_result = serde_json::from_value::(value.clone()); return match response_result { Ok(response) => Ok(JsonRpcHttpResponse::Success(response)), Err(error) => Err(crate::Error::Json(format!( "cannot parse http json-rpc success response: {error}" ))), }; } if has_id && has_error && !has_result { let response_result = serde_json::from_value::(value.clone()); return match response_result { Ok(response) => Ok(JsonRpcHttpResponse::Error(response)), Err(error) => { return Err(crate::Error::Json(format!( "cannot parse http json-rpc error response: {error}" ))); }, }; } return Err(crate::Error::Json("unsupported http json-rpc response shape".to_string())); } fn build_optional_config_only_params( config: std::option::Option, ) -> std::vec::Vec { let mut params = std::vec::Vec::new(); if let Some(config) = config { params.push(config); } return params; } fn build_first_string_optional_config_params( first: std::string::String, config: std::option::Option, ) -> std::vec::Vec { let mut params = vec![serde_json::Value::String(first)]; if let Some(config) = config { params.push(config); } return params; } fn serialize_optional_json_value( value: std::option::Option, label: &str, ) -> Result, crate::Error> where T: serde::Serialize, { match value { Some(value) => { let value_result = serde_json::to_value(value); match value_result { Ok(value) => return Ok(Some(value)), Err(error) => { return Err(crate::Error::Json(format!("cannot serialize {}: {error}", label))); }, } }, None => return Ok(None), } } fn http_classify_method(method: &str) -> HttpMethodClass { if method == "sendTransaction" || method == "sendRawTransaction" { return HttpMethodClass::SendTransaction; } if method == "getProgramAccounts" || method == "getLargestAccounts" || method == "getTransaction" || method == "getBlock" { return HttpMethodClass::HeavyRead; } return HttpMethodClass::GeneralRpc; } fn http_effective_requests_per_second( endpoint: &crate::HttpEndpointConfig, method_class: HttpMethodClass, ) -> u32 { match method_class { HttpMethodClass::GeneralRpc => return endpoint.requests_per_second, HttpMethodClass::SendTransaction => { return endpoint .send_transaction_requests_per_second .unwrap_or(endpoint.requests_per_second); }, HttpMethodClass::HeavyRead => { return endpoint.heavy_requests_per_second.unwrap_or(endpoint.requests_per_second); }, } } fn http_effective_burst_capacity( endpoint: &crate::HttpEndpointConfig, method_class: HttpMethodClass, ) -> u32 { match method_class { HttpMethodClass::GeneralRpc => return endpoint.burst_capacity, HttpMethodClass::SendTransaction => { return endpoint.send_transaction_burst_capacity.unwrap_or(endpoint.burst_capacity); }, HttpMethodClass::HeavyRead => { return endpoint.heavy_burst_capacity.unwrap_or(endpoint.burst_capacity); }, } } fn http_normalize_runtime_lifecycle(runtime: &mut HttpRuntimeState) { if let HttpEndpointLifecycleState::PausedUntil(deadline) = runtime.lifecycle.clone() { if deadline <= std::time::Instant::now() { runtime.lifecycle = HttpEndpointLifecycleState::Active; } } } fn http_consume_rate_limit_token( endpoint: &crate::HttpEndpointConfig, runtime: &mut HttpRuntimeState, method_class: HttpMethodClass, ) -> std::option::Option { let (bucket, requests_per_second, burst_capacity) = match method_class { HttpMethodClass::GeneralRpc => ( &mut runtime.general_bucket, http_effective_requests_per_second(endpoint, method_class), http_effective_burst_capacity(endpoint, method_class), ), HttpMethodClass::SendTransaction => ( &mut runtime.send_transaction_bucket, http_effective_requests_per_second(endpoint, method_class), http_effective_burst_capacity(endpoint, method_class), ), HttpMethodClass::HeavyRead => ( &mut runtime.heavy_read_bucket, http_effective_requests_per_second(endpoint, method_class), http_effective_burst_capacity(endpoint, method_class), ), }; let now = std::time::Instant::now(); let elapsed_seconds = now.duration_since(bucket.last_refill_at).as_secs_f64(); let replenished_tokens = bucket.tokens + elapsed_seconds * requests_per_second as f64; bucket.tokens = replenished_tokens.min(burst_capacity as f64); bucket.last_refill_at = now; if bucket.tokens >= 1.0 { bucket.tokens -= 1.0; return None; } let missing_tokens = 1.0 - bucket.tokens; let wait_seconds = missing_tokens / requests_per_second as f64; return Some(std::time::Duration::from_secs_f64(wait_seconds.max(0.001))); } fn http_retry_after_to_pause_ms( header_value: &reqwest::header::HeaderValue, ) -> std::option::Option { let header_text_result = header_value.to_str(); let header_text = match header_text_result { Ok(header_text) => header_text.trim(), Err(_) => { return None; }, }; let seconds_result = header_text.parse::(); if let Ok(seconds) = seconds_result { return Some(seconds.saturating_mul(1000)); } let parsed_date_result = chrono::DateTime::parse_from_rfc2822(header_text); let parsed_date = match parsed_date_result { Ok(parsed_date) => parsed_date.with_timezone(&chrono::Utc), Err(_) => { return None; }, }; let now = chrono::Utc::now(); let delta_ms = parsed_date.signed_duration_since(now).num_milliseconds(); if delta_ms <= 0 { return Some(0); } return Some(delta_ms as u64); } fn http_shorten_text(input: &str, max_chars: usize) -> std::string::String { let char_count = input.chars().count(); if char_count <= max_chars { return input.to_string(); } let shortened: std::string::String = input.chars().take(max_chars).collect(); return format!("{shortened} …[truncated {} chars]", char_count - max_chars); } #[cfg(test)] mod tests { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; #[derive(Debug)] struct TestHttpServer { url: std::string::String, shutdown_tx: std::option::Option>, observed_methods: std::sync::Arc>>, } impl TestHttpServer { async fn spawn() -> Self { let observed_methods = std::sync::Arc::new(tokio::sync::Mutex::new(std::vec::Vec::new())); let listener = tokio::net::TcpListener::bind("127.0.0.1:0") .await .expect("listener bind must succeed"); let local_addr = listener.local_addr().expect("local addr must exist"); let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); let observed_methods_for_server = observed_methods.clone(); tokio::spawn(async move { loop { tokio::select! { _ = &mut shutdown_rx => { break; }, accept_result = listener.accept() => { let (mut stream, _peer_addr) = accept_result.expect("accept must succeed"); let observed_methods_for_connection = observed_methods_for_server.clone(); tokio::spawn(async move { let mut buffer = vec![0u8; 65536]; let read_result = stream.read(&mut buffer).await; let bytes_read = read_result.expect("read must succeed"); let request_text = std::string::String::from_utf8_lossy(&buffer[..bytes_read]).to_string(); let split_result: std::vec::Vec<&str> = request_text.split("\r\n\r\n").collect(); let body = if split_result.len() >= 2 { split_result[1].to_string() } else { std::string::String::new() }; let request_json: serde_json::Value = serde_json::from_str(&body).expect("request body must be valid json"); let method = request_json["method"] .as_str() .expect("method must be a string") .to_string(); { let mut observed_methods_guard = observed_methods_for_connection.lock().await; observed_methods_guard.push(method.clone()); } let id = request_json["id"].clone(); if method == "rateLimitMe" { let response_body = serde_json::json!({ "jsonrpc": "2.0", "error": { "code": 42900, "message": "Too many requests" }, "id": id }).to_string(); let response_text = format!( "HTTP/1.1 429 TOO MANY REQUESTS\r\nContent-Type: application/json\r\nRetry-After: 1\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", response_body.len(), response_body ); let _ = stream.write_all(response_text.as_bytes()).await; let _ = stream.shutdown().await; return; } let response_body = if method == "getHealth" { serde_json::json!({ "jsonrpc": "2.0", "result": "ok", "id": id }).to_string() } else if method == "getVersion" { serde_json::json!({ "jsonrpc": "2.0", "result": { "solana-core": "2.2.3", "feature-set": 123 }, "id": id }).to_string() } else if method == "getSlot" { serde_json::json!({ "jsonrpc": "2.0", "result": 424242u64, "id": id }).to_string() } else if method == "getBlockHeight" { serde_json::json!({ "jsonrpc": "2.0", "result": 919191u64, "id": id }).to_string() } else if method == "getLatestBlockhash" { serde_json::json!({ "jsonrpc": "2.0", "result": { "context": { "slot": 999u64 }, "value": { "blockhash": crate::SYSTEM_PROGRAM_ID, "lastValidBlockHeight": 12345u64 } }, "id": id }).to_string() } else if method == "getBalance" { serde_json::json!({ "jsonrpc": "2.0", "result": { "context": { "slot": 77u64 }, "value": 5000u64 }, "id": id }).to_string() } else if method == "getSignaturesForAddress" { serde_json::json!({ "jsonrpc": "2.0", "result": [ { "signature": "5h6xBEauJ3PK6SWC7r7J2W8mE1D7aQj4J6Jg8n1SmWnVqSg9H6gq2K7xwJkL2GZ2RZ6n9wYk9cW1b2V3a4d5e6f7", "slot": 88u64, "err": null, "memo": null, "blockTime": 1700000000i64, "confirmationStatus": "finalized" } ], "id": id }).to_string() } else if method == "getTransaction" { serde_json::json!({ "jsonrpc": "2.0", "result": null, "id": id }).to_string() } else if method == "getAccountInfo" { serde_json::json!({ "jsonrpc": "2.0", "result": { "context": { "slot": 55u64 }, "value": { "data": ["", "base64"], "executable": false, "lamports": 1u64, "owner": crate::SYSTEM_PROGRAM_ID, "rentEpoch": 0u64, "space": 0u64 } }, "id": id }).to_string() } else if method == "getProgramAccounts" { serde_json::json!({ "jsonrpc": "2.0", "result": [], "id": id }).to_string() } else if method == "sendTransaction" { serde_json::json!({ "jsonrpc": "2.0", "result": "signature-test", "id": id }).to_string() } else { serde_json::json!({ "jsonrpc": "2.0", "error": { "code": -32601, "message": "Method not found" }, "id": id }).to_string() }; let response_text = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", response_body.len(), response_body ); let _ = stream.write_all(response_text.as_bytes()).await; let _ = stream.shutdown().await; }); } } } }); return Self { url: format!("http://{}", local_addr), shutdown_tx: Some(shutdown_tx), observed_methods, }; } async fn observed_methods_snapshot(&self) -> std::vec::Vec { let observed_methods_guard = self.observed_methods.lock().await; return observed_methods_guard.clone(); } async fn shutdown(mut self) { if let Some(shutdown_tx) = self.shutdown_tx.take() { let _ = shutdown_tx.send(()); } } } fn make_http_endpoint(url: std::string::String) -> crate::HttpEndpointConfig { return crate::HttpEndpointConfig { name: "test_http".to_string(), enabled: true, provider: "test".to_string(), url, api_key_env_var: None, roles: vec!["http_queries".to_string()], requests_per_second: 20, burst_capacity: 5, send_transaction_requests_per_second: Some(2), send_transaction_burst_capacity: Some(1), heavy_requests_per_second: Some(1), heavy_burst_capacity: Some(1), connect_timeout_ms: 2000, request_timeout_ms: 2000, max_idle_connections_per_host: 4, pause_after_http_429_ms: Some(1500), max_concurrent_requests_per_endpoint: 2, }; } #[test] fn parse_http_success_response_works() { let parsed = crate::parse_json_rpc_http_response_text(r#"{"jsonrpc":"2.0","result":"ok","id":1}"#) .expect("parse must succeed"); match parsed { crate::JsonRpcHttpResponse::Success(response) => { assert_eq!(response.result, serde_json::Value::String("ok".to_string())); assert_eq!(response.id, serde_json::Value::from(1u64)); }, other => { panic!("unexpected response: {other:?}"); }, } } #[test] fn parse_http_error_response_works() { let parsed = crate::parse_json_rpc_http_response_text( r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":1}"#, ) .expect("parse must succeed"); match parsed { crate::JsonRpcHttpResponse::Error(response) => { assert_eq!(response.error.code, -32601); assert_eq!(response.error.message, "Method not found"); }, other => { panic!("unexpected response: {other:?}"); }, } } #[test] fn classify_method_distinguishes_general_send_and_heavy() { assert_eq!( crate::HttpClient::classify_method("getSlot"), crate::HttpMethodClass::GeneralRpc ); assert_eq!( crate::HttpClient::classify_method("sendTransaction"), crate::HttpMethodClass::SendTransaction ); assert_eq!( crate::HttpClient::classify_method("getProgramAccounts"), crate::HttpMethodClass::HeavyRead ); assert_eq!( crate::HttpClient::classify_method("getTransaction"), crate::HttpMethodClass::HeavyRead ); } #[tokio::test] async fn next_request_id_is_shared_between_clones() { let endpoint = make_http_endpoint("http://127.0.0.1:65535".to_string()); let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); let cloned = client.clone(); assert_eq!(client.next_request_id(), 1); assert_eq!(cloned.next_request_id(), 2); assert_eq!(client.next_request_id(), 3); } #[tokio::test] async fn manual_pause_and_resume_work() { let endpoint = make_http_endpoint("http://127.0.0.1:65535".to_string()); let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); let initial_status = client.endpoint_status().await; assert_eq!(initial_status, crate::HttpEndpointStatus::Active); client.pause_for(25).await; let paused_status = client.endpoint_status().await; match paused_status { crate::HttpEndpointStatus::Paused { remaining_ms } => { assert!(remaining_ms > 0); }, other => { panic!("unexpected status: {other:?}"); }, } client.resume().await; let resumed_status = client.endpoint_status().await; assert_eq!(resumed_status, crate::HttpEndpointStatus::Active); client.disable().await; let disabled_status = client.endpoint_status().await; assert_eq!(disabled_status, crate::HttpEndpointStatus::Disabled); client.enable().await; let enabled_status = client.endpoint_status().await; assert_eq!(enabled_status, crate::HttpEndpointStatus::Active); } #[tokio::test] async fn typed_helpers_work_for_basic_methods() { let server = TestHttpServer::spawn().await; let endpoint = make_http_endpoint(server.url.clone()); let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); let health = client.get_health().await.expect("get_health must succeed"); assert_eq!(health, "ok".to_string()); let version = client.get_version().await.expect("get_version must succeed"); assert_eq!(version.solana_core, "2.2.3".to_string()); let slot = client.get_slot(None).await.expect("get_slot must succeed"); assert_eq!(slot, 424242u64); let block_height = client.get_block_height(None).await.expect("get_block_height must succeed"); assert_eq!(block_height, 919191u64); let latest_blockhash = client .get_latest_blockhash(None) .await .expect("get_latest_blockhash must succeed"); assert_eq!(latest_blockhash.context.slot, 999u64); assert_eq!(latest_blockhash.value.blockhash, crate::SYSTEM_PROGRAM_ID.to_string()); assert_eq!(latest_blockhash.value.last_valid_block_height, 12345u64); let balance = client .get_balance(crate::SYSTEM_PROGRAM_ID.to_string(), None) .await .expect("get_balance must succeed"); assert_eq!(balance.context.slot, 77u64); assert_eq!(balance.value, 5000u64); let signatures = client .get_signatures_for_address(crate::SYSTEM_PROGRAM_ID.to_string(), None) .await .expect("get_signatures_for_address must succeed"); assert_eq!(signatures.len(), 1); assert_eq!(signatures[0].slot, 88u64); let transaction = client .get_transaction( "5h6xBEauJ3PK6SWC7r7J2W8mE1D7aQj4J6Jg8n1SmWnVqSg9H6gq2K7xwJkL2GZ2RZ6n9wYk9cW1b2V3a4d5e6f7".to_string(), None, ) .await .expect("get_transaction must succeed"); assert!(transaction.is_none()); let sent_signature = client .send_transaction("AAAA".to_string(), None) .await .expect("send_transaction must succeed"); assert_eq!(sent_signature, "signature-test".to_string()); let observed_methods = server.observed_methods_snapshot().await; assert!(observed_methods.iter().any(|method| return method == "getHealth")); assert!(observed_methods.iter().any(|method| return method == "getVersion")); assert!(observed_methods.iter().any(|method| return method == "getSlot")); assert!(observed_methods.iter().any(|method| return method == "getBlockHeight")); assert!(observed_methods.iter().any(|method| return method == "getLatestBlockhash")); assert!(observed_methods.iter().any(|method| return method == "getBalance")); assert!(observed_methods.iter().any(|method| return method == "getSignaturesForAddress")); assert!(observed_methods.iter().any(|method| return method == "getTransaction")); assert!(observed_methods.iter().any(|method| return method == "sendTransaction")); server.shutdown().await; } #[tokio::test] async fn raw_helpers_send_expected_methods() { let server = TestHttpServer::spawn().await; let endpoint = make_http_endpoint(server.url.clone()); let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); let _ = client.get_health_raw().await.expect("get_health_raw must succeed"); let _ = client.get_version_raw().await.expect("get_version_raw must succeed"); let _ = client.get_slot_raw(None).await.expect("get_slot_raw must succeed"); let _ = client .get_block_height_raw(None) .await .expect("get_block_height_raw must succeed"); let _ = client .get_latest_blockhash_raw(None) .await .expect("get_latest_blockhash_raw must succeed"); let _ = client .get_balance_raw(crate::SYSTEM_PROGRAM_ID.to_string(), None) .await .expect("get_balance_raw must succeed"); let _ = client .get_account_info_raw(crate::SYSTEM_PROGRAM_ID.to_string(), None) .await .expect("get_account_info_raw must succeed"); let _ = client .get_program_accounts_raw(crate::SYSTEM_PROGRAM_ID.to_string(), None) .await .expect("get_program_accounts_raw must succeed"); let _ = client .get_signatures_for_address_raw(crate::SYSTEM_PROGRAM_ID.to_string(), None) .await .expect("get_signatures_for_address_raw must succeed"); let _ = client .get_transaction_raw( "5h6xBEauJ3PK6SWC7r7J2W8mE1D7aQj4J6Jg8n1SmWnVqSg9H6gq2K7xwJkL2GZ2RZ6n9wYk9cW1b2V3a4d5e6f7".to_string(), None, ) .await .expect("get_transaction_raw must succeed"); let _ = client .send_transaction_raw("AAAA".to_string(), None) .await .expect("send_transaction_raw must succeed"); let observed_methods = server.observed_methods_snapshot().await; assert!(observed_methods.iter().any(|method| return method == "getHealth")); assert!(observed_methods.iter().any(|method| return method == "getVersion")); assert!(observed_methods.iter().any(|method| return method == "getSlot")); assert!(observed_methods.iter().any(|method| return method == "getBlockHeight")); assert!(observed_methods.iter().any(|method| return method == "getLatestBlockhash")); assert!(observed_methods.iter().any(|method| return method == "getBalance")); assert!(observed_methods.iter().any(|method| return method == "getAccountInfo")); assert!(observed_methods.iter().any(|method| return method == "getProgramAccounts")); assert!(observed_methods.iter().any(|method| return method == "getSignaturesForAddress")); assert!(observed_methods.iter().any(|method| return method == "getTransaction")); assert!(observed_methods.iter().any(|method| return method == "sendTransaction")); server.shutdown().await; } #[tokio::test] async fn http_429_triggers_local_pause() { let server = TestHttpServer::spawn().await; let endpoint = make_http_endpoint(server.url.clone()); let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); let result = client .execute_json_rpc_request_raw("rateLimitMe".to_string(), std::vec::Vec::new()) .await; assert!(result.is_err()); let paused_status = client.endpoint_status().await; match paused_status { crate::HttpEndpointStatus::Paused { remaining_ms } => { assert!(remaining_ms > 0); }, other => { panic!("unexpected status after 429: {other:?}"); }, } tokio::time::sleep(std::time::Duration::from_millis(1100)).await; let resumed_status = client.endpoint_status().await; assert_eq!(resumed_status, crate::HttpEndpointStatus::Active); server.shutdown().await; } #[tokio::test] async fn disabled_endpoint_rejects_requests() { let endpoint = make_http_endpoint("http://127.0.0.1:65535".to_string()); let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); client.disable().await; let result = client.get_health_raw().await; assert!(result.is_err()); let error = result.expect_err("disabled endpoint must reject requests"); match error { crate::Error::Http(message) => { assert!(message.contains("is disabled")); }, other => { panic!("unexpected error: {other:?}"); }, } } #[tokio::test] async fn unknown_method_returns_error() { let server = TestHttpServer::spawn().await; let endpoint = make_http_endpoint(server.url.clone()); let client = crate::HttpClient::new(endpoint).expect("client creation must succeed"); let result = client .execute_json_rpc_request_raw("unknownMethod".to_string(), std::vec::Vec::new()) .await; assert!(result.is_err()); let error = result.expect_err("unknown method must fail"); match error { crate::Error::Http(message) => { assert!(message.contains("Method not found")); }, other => { panic!("unexpected error: {other:?}"); }, } server.shutdown().await; } }