From 60b884189575b3a501faa08869b6916d986bcbb4 Mon Sep 17 00:00:00 2001 From: SinuS Von SifriduS Date: Sat, 25 Apr 2026 22:53:36 +0200 Subject: [PATCH] 0.7.1 --- CHANGELOG.md | 1 + Cargo.toml | 2 +- ROADMAP.md | 25 +- kb_lib/src/http_pool.rs | 17 ++ kb_lib/src/lib.rs | 7 + kb_lib/src/tx_resolution.rs | 472 ++++++++++++++++++++++++++++++++++++ kb_lib/src/ws_client.rs | 85 ++++++- kb_lib/src/ws_manager.rs | 101 ++++++++ 8 files changed, 695 insertions(+), 15 deletions(-) create mode 100644 kb_lib/src/tx_resolution.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bc3288..5ab9f7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,3 +30,4 @@ 0.6.4 - Premières règles de détection technique pour candidats pools/listings depuis programNotification en s’appuyant sur les DEX connus en base 0.6.5 - Ajout de ws_manager.rs pour l’orchestration multi-clients WebSocket, le bus d’événements unifié et le branchement centralisé du relais de détection 0.6.6 - Ajout de la fenêtre Demo Ws Manager dans kb_app pour piloter plusieurs WsClient, visualiser le snapshot consolidé, tester le démarrage/arrêt par rôle et valider le flux unifié de WsEvent +0.7.0 - Ajout du socle de résolution transactionnelle orientée DEX : relais WS vers file de résolution, récupération getTransaction via HttpEndpointPool et persistance des résolutions dans les observations/signaux diff --git a/Cargo.toml b/Cargo.toml index f88990e..120c162 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.6.6" +version = "0.7.0" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index 0d379e3..185e448 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -448,14 +448,13 @@ Réalisé : - amélioration des messages de log UI pour les actions idempotentes déjà démarrées ou déjà arrêtées. ### 6.032. Version `0.7.0` — Résolution transactionnelle orientée DEX -Objectif : relier la détection temps réel aux transactions Solana complètes. +Réalisé : -À faire : - -- introduire une file de résolution transactionnelle alimentée par les signatures issues des flux WS, -- corréler `logsNotification`, `programNotification` et `signatureNotification` avec des appels `getTransaction`, -- utiliser le pool HTTP existant pour enrichir les signaux détectés côté WS, -- éviter qu’une notification intéressante reste au niveau d’un simple signal technique sans résolution métier. +- introduction d’une file de résolution transactionnelle alimentée par les signatures issues des flux WS utiles, +- corrélation initiale des `logsNotification` et `signatureNotification` avec des appels `getTransaction`, +- utilisation du pool HTTP existant pour enrichir les signaux détectés côté WS, +- persistance des résolutions transactionnelles dans `kb_onchain_observations` et `kb_analysis_signals`, +- préparation du futur modèle transactionnel enrichi sans bloquer les flux temps réel. ### 6.033. Version `0.7.1` — Modèle transactionnel Solana enrichi Objectif : préparer un modèle interne plus riche, inspiré d’une vision `slot -> signature -> instruction`. @@ -657,10 +656,10 @@ Le projet doit maintenir au minimum : ## 12. Priorité immédiate La priorité immédiate est désormais la suivante : -1. démarrer la version `0.7.0` avec la résolution transactionnelle orientée DEX, -2. introduire une file de signatures ou de résolutions alimentée par les flux WS utiles, -3. corréler `logsNotification`, `programNotification` et `signatureNotification` avec des appels `getTransaction`, -4. utiliser le pool HTTP existant pour enrichir les signaux détectés côté WS, -5. préparer ensuite la version `0.7.1` pour le modèle transactionnel Solana enrichi, -6. conserver le découplage entre transport, résolution transactionnelle, détection métier et stockage. +1. démarrer la version `0.7.1` avec le modèle transactionnel Solana enrichi, +2. préparer les structures et tables reliant slots, signatures et instructions, +3. distinguer clairement transaction principale et inner instructions, +4. préparer l’historique transactionnel exploitable par les futurs décodeurs DEX, +5. conserver le découplage entre transport, résolution transactionnelle, détection métier et stockage, +6. préparer ensuite la version `0.7.2` pour les décodeurs DEX spécifiques par programme et version. diff --git a/kb_lib/src/http_pool.rs b/kb_lib/src/http_pool.rs index 3c404a5..4eb53cf 100644 --- a/kb_lib/src/http_pool.rs +++ b/kb_lib/src/http_pool.rs @@ -261,6 +261,23 @@ impl HttpEndpointPool { ) .await } + + /// Executes `getTransaction` through the pool and returns the raw result value. + pub async fn get_transaction_raw_for_role( + &self, + required_role: &str, + signature: std::string::String, + config: std::option::Option, + ) -> Result { + let client_result = self + .select_client_for_role_and_method(required_role, "getTransaction") + .await; + let client = match client_result { + Ok(client) => client, + Err(error) => return Err(error), + }; + client.get_transaction_raw(signature, config).await + } } fn kb_pool_build_optional_config_only_params( diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index 875ab31..b9989fc 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -21,6 +21,7 @@ mod http_client; mod http_pool; mod db; mod detect; +mod tx_resolution; pub use crate::constants::*; pub use crate::error::KbError; @@ -168,3 +169,9 @@ pub use crate::detect::KbWsDetectionRelay; pub use crate::detect::KbWsDetectionRelayStats; pub use crate::detect::KbDetectionPoolCandidateInput; pub use crate::detect::KbDetectionPoolCandidateResult; +pub use crate::tx_resolution::KbTransactionResolutionOutcome; +pub use crate::tx_resolution::KbTransactionResolutionRequest; +pub use crate::tx_resolution::KbTransactionResolutionService; +pub use crate::tx_resolution::KbWsTransactionResolutionEnvelope; +pub use crate::tx_resolution::KbWsTransactionResolutionRelay; +pub use crate::tx_resolution::KbWsTransactionResolutionRelayStats; diff --git a/kb_lib/src/tx_resolution.rs b/kb_lib/src/tx_resolution.rs new file mode 100644 index 0000000..0a72271 --- /dev/null +++ b/kb_lib/src/tx_resolution.rs @@ -0,0 +1,472 @@ +// file: kb_lib/src/tx_resolution.rs + +//! Transaction resolution pipeline. +//! +//! This module bridges useful WebSocket notifications carrying a signature +//! with HTTP `getTransaction` resolution through the existing endpoint pool. + +/// One transaction resolution request built from a WebSocket notification. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbTransactionResolutionRequest { + /// Transaction signature to resolve. + pub signature: std::string::String, + /// Notification method that triggered the request. + pub trigger_method: std::string::String, + /// Optional source endpoint logical name. + pub source_endpoint_name: std::option::Option, + /// Optional slot hint extracted from the triggering notification. + pub slot_hint: std::option::Option, + /// Raw triggering notification payload. + pub trigger_payload: serde_json::Value, +} + +/// One forwarded WebSocket notification envelope for transaction resolution. +#[derive(Debug, Clone)] +pub struct KbWsTransactionResolutionEnvelope { + /// Optional source endpoint logical name. + pub endpoint_name: std::option::Option, + /// Raw JSON-RPC notification. + pub notification: crate::KbJsonRpcWsNotification, + /// Optional matched subscription metadata. + pub subscription: std::option::Option, +} + +impl KbWsTransactionResolutionEnvelope { + /// Creates a new transaction-resolution envelope. + pub fn new( + endpoint_name: std::option::Option, + notification: crate::KbJsonRpcWsNotification, + subscription: std::option::Option, + ) -> Self { + Self { + endpoint_name, + notification, + subscription, + } + } +} + +/// Result of one transaction resolution pass. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum KbTransactionResolutionOutcome { + /// The notification does not produce a transaction resolution request. + Ignored, + /// The transaction was successfully resolved and persisted. + Resolved { + /// Signature that was resolved. + signature: std::string::String, + /// Persisted observation id. + observation_id: i64, + /// Persisted signal id. + signal_id: i64, + }, + /// The transaction lookup returned `null`. + Missing { + /// Signature that was queried. + signature: std::string::String, + /// Persisted observation id. + observation_id: i64, + /// Persisted signal id. + signal_id: i64, + }, + /// One HTTP resolution error was converted into a signal. + ErrorSignaled { + /// Signature that failed to resolve. + signature: std::string::String, + /// Persisted signal id. + signal_id: i64, + }, +} + +/// Runtime statistics for one transaction resolution relay worker. +#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] +pub struct KbWsTransactionResolutionRelayStats { + /// Number of received envelopes. + pub received_count: u64, + /// Number of ignored envelopes. + pub ignored_count: u64, + /// Number of successfully resolved transactions. + pub resolved_count: u64, + /// Number of missing transactions. + pub missing_count: u64, + /// Number of error signals produced. + pub error_count: u64, +} + +/// Transaction resolution service. +#[derive(Debug, Clone)] +pub struct KbTransactionResolutionService { + http_pool: std::sync::Arc, + persistence: crate::KbDetectionPersistenceService, + http_role: std::string::String, + resolved_signatures: + std::sync::Arc>>, +} + +impl KbTransactionResolutionService { + /// Creates a new transaction resolution service. + pub fn new( + http_pool: std::sync::Arc, + persistence: crate::KbDetectionPersistenceService, + http_role: std::string::String, + ) -> Self { + Self { + http_pool, + persistence, + http_role, + resolved_signatures: std::sync::Arc::new(tokio::sync::Mutex::new( + std::collections::HashSet::new(), + )), + } + } + + /// Returns the persistence façade used by the resolver. + pub fn persistence(&self) -> &crate::KbDetectionPersistenceService { + &self.persistence + } + + /// Builds one transaction resolution request from one WS envelope. + pub fn try_build_request_from_ws_envelope( + &self, + envelope: &crate::KbWsTransactionResolutionEnvelope, + ) -> std::option::Option { + let method = envelope.notification.method.as_str(); + if method != "logsNotification" + && method != "signatureNotification" + && method != "programNotification" + { + return None; + } + let signature_option = + kb_extract_resolution_signature(&envelope.notification, envelope.subscription.as_ref()); + let signature = match signature_option { + Some(signature) => signature, + None => return None, + }; + let slot_hint = kb_extract_resolution_slot(&envelope.notification); + Some(crate::KbTransactionResolutionRequest { + signature, + trigger_method: envelope.notification.method.clone(), + source_endpoint_name: envelope.endpoint_name.clone(), + slot_hint, + trigger_payload: serde_json::json!({ + "jsonrpc": envelope.notification.jsonrpc, + "method": envelope.notification.method, + "subscription": envelope.notification.params.subscription, + "result": envelope.notification.params.result, + }), + }) + } + + /// Processes one forwarded WS envelope. + pub async fn process_ws_envelope( + &self, + envelope: &crate::KbWsTransactionResolutionEnvelope, + ) -> Result { + let request_option = self.try_build_request_from_ws_envelope(envelope); + let request = match request_option { + Some(request) => request, + None => return Ok(crate::KbTransactionResolutionOutcome::Ignored), + }; + { + let resolved_guard = self.resolved_signatures.lock().await; + if resolved_guard.contains(request.signature.as_str()) { + return Ok(crate::KbTransactionResolutionOutcome::Ignored); + } + } + let outcome_result = self.resolve_request(&request).await; + let outcome = match outcome_result { + Ok(outcome) => outcome, + Err(error) => { + let signal_id_result = self.record_resolution_error_signal(&request, &error).await; + let signal_id = match signal_id_result { + Ok(signal_id) => signal_id, + Err(signal_error) => return Err(signal_error), + }; + return Ok(crate::KbTransactionResolutionOutcome::ErrorSignaled { + signature: request.signature.clone(), + signal_id, + }); + } + }; + match &outcome { + crate::KbTransactionResolutionOutcome::Resolved { signature, .. } => { + let mut resolved_guard = self.resolved_signatures.lock().await; + resolved_guard.insert(signature.clone()); + } + crate::KbTransactionResolutionOutcome::Missing { signature, .. } => { + let mut resolved_guard = self.resolved_signatures.lock().await; + resolved_guard.insert(signature.clone()); + } + crate::KbTransactionResolutionOutcome::Ignored => {} + crate::KbTransactionResolutionOutcome::ErrorSignaled { .. } => {} + } + Ok(outcome) + } + + async fn resolve_request( + &self, + request: &crate::KbTransactionResolutionRequest, + ) -> Result { + let config = Some(serde_json::json!({ + "encoding": "jsonParsed", + "maxSupportedTransactionVersion": 0 + })); + let transaction_value_result = self + .http_pool + .get_transaction_raw_for_role( + self.http_role.as_str(), + request.signature.clone(), + config, + ) + .await; + let transaction_value = match transaction_value_result { + Ok(transaction_value) => transaction_value, + Err(error) => return Err(error), + }; + if transaction_value.is_null() { + let payload = serde_json::json!({ + "status": "missing", + "signature": request.signature.clone(), + "triggerMethod": request.trigger_method.clone(), + "sourceEndpointName": request.source_endpoint_name.clone(), + "slotHint": request.slot_hint, + "triggerPayload": request.trigger_payload.clone(), + "transaction": serde_json::Value::Null + }); + let observation_id_result = self + .persistence + .record_observation(&crate::KbDetectionObservationInput::new( + "http.transaction_resolution".to_string(), + crate::KbObservationSourceKind::HttpRpc, + request.source_endpoint_name.clone(), + request.signature.clone(), + request.slot_hint, + payload.clone(), + )) + .await; + let observation_id = match observation_id_result { + Ok(observation_id) => observation_id, + Err(error) => return Err(error), + }; + let signal_id_result = self + .persistence + .record_signal(&crate::KbDetectionSignalInput::new( + "signal.transaction_resolution.missing".to_string(), + crate::KbAnalysisSignalSeverity::Medium, + request.signature.clone(), + Some(observation_id), + None, + payload, + )) + .await; + let signal_id = match signal_id_result { + Ok(signal_id) => signal_id, + Err(error) => return Err(error), + }; + return Ok(crate::KbTransactionResolutionOutcome::Missing { + signature: request.signature.clone(), + observation_id, + signal_id, + }); + } + let resolved_slot = transaction_value + .get("slot") + .and_then(serde_json::Value::as_u64) + .or(request.slot_hint); + let payload = serde_json::json!({ + "status": "resolved", + "signature": request.signature.clone(), + "triggerMethod": request.trigger_method.clone(), + "sourceEndpointName": request.source_endpoint_name.clone(), + "slotHint": request.slot_hint, + "triggerPayload": request.trigger_payload.clone(), + "transaction": transaction_value + }); + let observation_id_result = self + .persistence + .record_observation(&crate::KbDetectionObservationInput::new( + "http.transaction_resolution".to_string(), + crate::KbObservationSourceKind::HttpRpc, + request.source_endpoint_name.clone(), + request.signature.clone(), + resolved_slot, + payload.clone(), + )) + .await; + let observation_id = match observation_id_result { + Ok(observation_id) => observation_id, + Err(error) => return Err(error), + }; + let signal_id_result = self + .persistence + .record_signal(&crate::KbDetectionSignalInput::new( + "signal.transaction_resolution.resolved".to_string(), + crate::KbAnalysisSignalSeverity::Low, + request.signature.clone(), + Some(observation_id), + None, + payload, + )) + .await; + let signal_id = match signal_id_result { + Ok(signal_id) => signal_id, + Err(error) => return Err(error), + }; + Ok(crate::KbTransactionResolutionOutcome::Resolved { + signature: request.signature.clone(), + observation_id, + signal_id, + }) + } + + async fn record_resolution_error_signal( + &self, + request: &crate::KbTransactionResolutionRequest, + error: &crate::KbError, + ) -> Result { + let payload = serde_json::json!({ + "status": "error", + "signature": request.signature.clone(), + "triggerMethod": request.trigger_method.clone(), + "sourceEndpointName": request.source_endpoint_name.clone(), + "slotHint": request.slot_hint, + "triggerPayload": request.trigger_payload.clone(), + "error": error.to_string() + }); + self.persistence + .record_signal(&crate::KbDetectionSignalInput::new( + "signal.transaction_resolution.error".to_string(), + crate::KbAnalysisSignalSeverity::High, + request.signature.clone(), + None, + None, + payload, + )) + .await + } +} + +/// Relay that consumes forwarded WS notifications and resolves matching +/// signatures through HTTP `getTransaction`. +#[derive(Debug, Clone)] +pub struct KbWsTransactionResolutionRelay { + resolver: crate::KbTransactionResolutionService, +} + +impl KbWsTransactionResolutionRelay { + /// Creates a new transaction resolution relay. + pub fn new(resolver: crate::KbTransactionResolutionService) -> Self { + Self { resolver } + } + + /// Creates a bounded relay channel. + pub fn channel( + capacity: usize, + ) -> ( + tokio::sync::mpsc::Sender, + tokio::sync::mpsc::Receiver, + ) { + tokio::sync::mpsc::channel(capacity) + } + + /// Processes one forwarded envelope. + pub async fn process_envelope( + &self, + envelope: &crate::KbWsTransactionResolutionEnvelope, + ) -> Result { + self.resolver.process_ws_envelope(envelope).await + } + + /// Spawns one background relay worker. + pub fn spawn( + self, + mut receiver: tokio::sync::mpsc::Receiver, + ) -> tokio::task::JoinHandle { + tokio::spawn(async move { + let mut stats = crate::KbWsTransactionResolutionRelayStats::default(); + loop { + let recv_result = receiver.recv().await; + let envelope = match recv_result { + Some(envelope) => envelope, + None => break, + }; + stats.received_count += 1; + let outcome_result = self.process_envelope(&envelope).await; + let outcome = match outcome_result { + Ok(outcome) => outcome, + Err(error) => { + stats.error_count += 1; + tracing::error!( + target: "kb_lib::tx_resolution", + "transaction resolution relay failed endpoint_name={:?}: {}", + envelope.endpoint_name, + error + ); + continue; + } + }; + match outcome { + crate::KbTransactionResolutionOutcome::Ignored => { + stats.ignored_count += 1; + } + crate::KbTransactionResolutionOutcome::Resolved { .. } => { + stats.resolved_count += 1; + } + crate::KbTransactionResolutionOutcome::Missing { .. } => { + stats.missing_count += 1; + } + crate::KbTransactionResolutionOutcome::ErrorSignaled { .. } => { + stats.error_count += 1; + } + } + } + stats + }) + } +} + +fn kb_extract_resolution_signature( + notification: &crate::KbJsonRpcWsNotification, + subscription: std::option::Option<&crate::WsSubscriptionInfo>, +) -> std::option::Option { + let result = ¬ification.params.result; + if let Some(signature) = result.get("signature").and_then(serde_json::Value::as_str) { + return Some(signature.to_string()); + } + if let Some(value) = result.get("value") { + if let Some(signature) = value.get("signature").and_then(serde_json::Value::as_str) { + return Some(signature.to_string()); + } + } + if notification.method.as_str() == "signatureNotification" { + if let Some(subscription) = subscription { + if let Some(first_param) = subscription.params.first() { + if let Some(signature) = first_param.as_str() { + return Some(signature.to_string()); + } + } + } + } + None +} + +fn kb_extract_resolution_slot( + notification: &crate::KbJsonRpcWsNotification, +) -> std::option::Option { + let result = ¬ification.params.result; + if let Some(slot) = result.get("slot").and_then(serde_json::Value::as_u64) { + return Some(slot); + } + if let Some(context) = result.get("context") { + if let Some(slot) = context.get("slot").and_then(serde_json::Value::as_u64) { + return Some(slot); + } + } + if let Some(value) = result.get("value") { + if let Some(slot) = value.get("slot").and_then(serde_json::Value::as_u64) { + return Some(slot); + } + } + None +} diff --git a/kb_lib/src/ws_client.rs b/kb_lib/src/ws_client.rs index 1952018..5aa1b52 100644 --- a/kb_lib/src/ws_client.rs +++ b/kb_lib/src/ws_client.rs @@ -240,6 +240,13 @@ pub struct WsClient { >, >, >, + transaction_resolution_notification_forwarder: std::sync::Arc< + tokio::sync::Mutex< + std::option::Option< + tokio::sync::mpsc::Sender, + >, + >, + >, } impl WsClient { @@ -267,6 +274,9 @@ impl WsClient { runtime: std::sync::Arc::new(tokio::sync::Mutex::new(WsClientRuntime::new())), registry: std::sync::Arc::new(tokio::sync::Mutex::new(WsClientRegistry::new())), detection_notification_forwarder: std::sync::Arc::new(tokio::sync::Mutex::new(None)), + transaction_resolution_notification_forwarder: std::sync::Arc::new( + tokio::sync::Mutex::new(None), + ), }) } @@ -382,7 +392,6 @@ impl WsClient { return Err(error); } }; - let (ws_stream, _response) = match connect_result { Ok(parts) => parts, Err(error) => { @@ -1158,6 +1167,7 @@ impl WsClient { .get(&subscription_id) .cloned() }; + let resolution_subscription = matched_subscription_option.clone(); match matched_subscription_option { Some(subscription) => { let method_matches_registry = @@ -1182,6 +1192,13 @@ impl WsClient { notification, ) .await; + forward_transaction_resolution_notification_if_configured( + &self.transaction_resolution_notification_forwarder, + self.endpoint.name.as_str(), + notification, + resolution_subscription, + ) + .await; } } } @@ -1365,6 +1382,27 @@ impl WsClient { ); } } + + /// Sets the optional transaction-resolution notification forwarder. + pub async fn set_transaction_resolution_notification_forwarder( + &self, + sender: tokio::sync::mpsc::Sender, + ) { + let mut guard = self + .transaction_resolution_notification_forwarder + .lock() + .await; + *guard = Some(sender); + } + + /// Clears the optional transaction-resolution notification forwarder. + pub async fn clear_transaction_resolution_notification_forwarder(&self) { + let mut guard = self + .transaction_resolution_notification_forwarder + .lock() + .await; + *guard = None; + } } fn kb_convert_outgoing_message( @@ -1528,6 +1566,51 @@ async fn forward_detection_notification_if_configured( } } +async fn forward_transaction_resolution_notification_if_configured( + forwarder: &std::sync::Arc< + tokio::sync::Mutex< + std::option::Option< + tokio::sync::mpsc::Sender, + >, + >, + >, + endpoint_name: &str, + notification: &crate::KbJsonRpcWsNotification, + subscription: std::option::Option, +) { + let sender_option = { + let guard = forwarder.lock().await; + guard.clone() + }; + let sender = match sender_option { + Some(sender) => sender, + None => return, + }; + let envelope = crate::KbWsTransactionResolutionEnvelope::new( + Some(endpoint_name.to_string()), + notification.clone(), + subscription, + ); + let send_result = sender.try_send(envelope); + match send_result { + Ok(()) => {} + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + tracing::warn!( + target: "kb_lib::ws_client", + "transaction resolution relay queue is full endpoint_name={}", + endpoint_name + ); + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { + tracing::warn!( + target: "kb_lib::ws_client", + "transaction resolution relay queue is closed endpoint_name={}", + endpoint_name + ); + } + } +} + #[cfg(test)] mod tests { use futures_util::SinkExt; diff --git a/kb_lib/src/ws_manager.rs b/kb_lib/src/ws_manager.rs index a976a21..36263ad 100644 --- a/kb_lib/src/ws_manager.rs +++ b/kb_lib/src/ws_manager.rs @@ -48,6 +48,11 @@ pub struct WsManager { std::option::Option>, >, detection_relay_abort_handle: tokio::sync::Mutex>, + transaction_resolution_relay_sender: tokio::sync::Mutex< + std::option::Option>, + >, + transaction_resolution_relay_abort_handle: + tokio::sync::Mutex>, } impl WsManager { @@ -97,6 +102,8 @@ impl WsManager { event_tx, detection_relay_sender: tokio::sync::Mutex::new(None), detection_relay_abort_handle: tokio::sync::Mutex::new(None), + transaction_resolution_relay_sender: tokio::sync::Mutex::new(None), + transaction_resolution_relay_abort_handle: tokio::sync::Mutex::new(None), }) } @@ -202,6 +209,15 @@ impl WsManager { if let Some(sender) = sender_option { client.set_detection_notification_forwarder(sender).await; } + let tx_resolution_sender_option = { + let sender_guard = self.transaction_resolution_relay_sender.lock().await; + sender_guard.clone() + }; + if let Some(sender) = tx_resolution_sender_option { + client + .set_transaction_resolution_notification_forwarder(sender) + .await; + } let connect_result = client.connect().await; if let Err(error) = connect_result { return Err(error); @@ -236,6 +252,9 @@ impl WsManager { return Ok(false); } client.clear_detection_notification_forwarder().await; + client + .clear_transaction_resolution_notification_forwarder() + .await; let disconnect_result = client.disconnect().await; if let Err(error) = disconnect_result { return Err(error); @@ -433,6 +452,82 @@ impl WsManager { }; Ok(client.connection_state().await) } + + /// Attaches one shared transaction-resolution relay to all managed clients. + pub async fn attach_transaction_resolution_relay( + &self, + database: std::sync::Arc, + http_pool: std::sync::Arc, + http_role: std::string::String, + queue_capacity: usize, + ) -> Result<(), crate::KbError> { + { + let sender_guard = self.transaction_resolution_relay_sender.lock().await; + if sender_guard.is_some() { + return Err(crate::KbError::InvalidState( + "websocket transaction resolution relay is already attached".to_string(), + )); + } + } + let persistence = crate::KbDetectionPersistenceService::new(database); + let resolver = + crate::KbTransactionResolutionService::new(http_pool, persistence, http_role); + let relay = crate::KbWsTransactionResolutionRelay::new(resolver); + let (sender, receiver) = crate::KbWsTransactionResolutionRelay::channel(queue_capacity); + let relay_task = relay.spawn(receiver); + let relay_abort_handle = relay_task.abort_handle(); + { + let mut sender_guard = self.transaction_resolution_relay_sender.lock().await; + *sender_guard = Some(sender.clone()); + } + { + let mut abort_guard = self.transaction_resolution_relay_abort_handle.lock().await; + *abort_guard = Some(relay_abort_handle); + } + let clients = { + let clients_guard = self.clients.lock().await; + let mut values = std::vec::Vec::new(); + for managed in clients_guard.values() { + values.push(managed.client.clone()); + } + values + }; + for client in clients { + client + .set_transaction_resolution_notification_forwarder(sender.clone()) + .await; + } + Ok(()) + } + + /// Detaches the shared transaction-resolution relay from all managed clients. + pub async fn detach_transaction_resolution_relay(&self) -> Result<(), crate::KbError> { + let clients = { + let clients_guard = self.clients.lock().await; + let mut values = std::vec::Vec::new(); + for managed in clients_guard.values() { + values.push(managed.client.clone()); + } + values + }; + for client in clients { + client + .clear_transaction_resolution_notification_forwarder() + .await; + } + { + let mut sender_guard = self.transaction_resolution_relay_sender.lock().await; + *sender_guard = None; + } + let abort_handle_option = { + let mut abort_guard = self.transaction_resolution_relay_abort_handle.lock().await; + abort_guard.take() + }; + if let Some(abort_handle) = abort_handle_option { + abort_handle.abort(); + } + Ok(()) + } } impl Drop for WsManager { @@ -449,6 +544,12 @@ impl Drop for WsManager { abort_handle.abort(); } } + let tx_resolution_abort_result = self.transaction_resolution_relay_abort_handle.try_lock(); + if let Ok(mut abort_guard) = tx_resolution_abort_result { + if let Some(abort_handle) = abort_guard.take() { + abort_handle.abort(); + } + } } }