// file: kb_lib/src/tx_resolution.rs //! Transaction resolution pipeline. //! //! This module bridges useful WebSocket notifications carrying a signature //! with HTTP `getTransaction` resolution through the existing endpoint pool. /// One transaction resolution request built from a WebSocket notification. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct KbTransactionResolutionRequest { /// Transaction signature to resolve. pub signature: std::string::String, /// Notification method that triggered the request. pub trigger_method: std::string::String, /// Optional source endpoint logical name. pub source_endpoint_name: std::option::Option, /// Optional slot hint extracted from the triggering notification. pub slot_hint: std::option::Option, /// Raw triggering notification payload. pub trigger_payload: serde_json::Value, } /// One forwarded WebSocket notification envelope for transaction resolution. #[derive(Debug, Clone)] pub struct KbWsTransactionResolutionEnvelope { /// Optional source endpoint logical name. pub endpoint_name: std::option::Option, /// Raw JSON-RPC notification. pub notification: crate::KbJsonRpcWsNotification, /// Optional matched subscription metadata. pub subscription: std::option::Option, } impl KbWsTransactionResolutionEnvelope { /// Creates a new transaction-resolution envelope. pub fn new( endpoint_name: std::option::Option, notification: crate::KbJsonRpcWsNotification, subscription: std::option::Option, ) -> Self { Self { endpoint_name, notification, subscription, } } } /// Result of one transaction resolution pass. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum KbTransactionResolutionOutcome { /// The notification does not produce a transaction resolution request. Ignored, /// The transaction was successfully resolved and persisted. Resolved { /// Signature that was resolved. signature: std::string::String, /// Persisted observation id. observation_id: i64, /// Persisted signal id. signal_id: i64, }, /// The transaction lookup returned `null`. Missing { /// Signature that was queried. signature: std::string::String, /// Persisted observation id. observation_id: i64, /// Persisted signal id. signal_id: i64, }, /// One HTTP resolution error was converted into a signal. ErrorSignaled { /// Signature that failed to resolve. signature: std::string::String, /// Persisted signal id. signal_id: i64, }, } /// Runtime statistics for one transaction resolution relay worker. #[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] pub struct KbWsTransactionResolutionRelayStats { /// Number of received envelopes. pub received_count: u64, /// Number of ignored envelopes. pub ignored_count: u64, /// Number of successfully resolved transactions. pub resolved_count: u64, /// Number of missing transactions. pub missing_count: u64, /// Number of error signals produced. pub error_count: u64, } /// Transaction resolution service. #[derive(Debug, Clone)] pub struct KbTransactionResolutionService { http_pool: std::sync::Arc, persistence: crate::KbDetectionPersistenceService, transaction_model: crate::KbTransactionModelService, dex_decode_service: crate::KbDexDecodeService, http_role: std::string::String, resolved_signatures: std::sync::Arc>>, } impl KbTransactionResolutionService { /// Creates a new transaction resolution service. pub fn new( http_pool: std::sync::Arc, database: std::sync::Arc, http_role: std::string::String, ) -> Self { let persistence = crate::KbDetectionPersistenceService::new(database.clone()); let transaction_model = crate::KbTransactionModelService::new(database.clone()); let dex_decode_service = crate::KbDexDecodeService::new(database); Self { http_pool, persistence, transaction_model, dex_decode_service, http_role, resolved_signatures: std::sync::Arc::new(tokio::sync::Mutex::new( std::collections::HashSet::new(), )), } } /// Returns the persistence façade used by the resolver. pub fn persistence(&self) -> &crate::KbDetectionPersistenceService { &self.persistence } /// Builds one transaction resolution request from one WS envelope. pub fn try_build_request_from_ws_envelope( &self, envelope: &crate::KbWsTransactionResolutionEnvelope, ) -> std::option::Option { let method = envelope.notification.method.as_str(); if method != "logsNotification" && method != "signatureNotification" && method != "programNotification" { return None; } let signature_option = kb_extract_resolution_signature(&envelope.notification, envelope.subscription.as_ref()); let signature = match signature_option { Some(signature) => signature, None => return None, }; let slot_hint = kb_extract_resolution_slot(&envelope.notification); Some(crate::KbTransactionResolutionRequest { signature, trigger_method: envelope.notification.method.clone(), source_endpoint_name: envelope.endpoint_name.clone(), slot_hint, trigger_payload: serde_json::json!({ "jsonrpc": envelope.notification.jsonrpc, "method": envelope.notification.method, "subscription": envelope.notification.params.subscription, "result": envelope.notification.params.result, }), }) } /// Processes one forwarded WS envelope. pub async fn process_ws_envelope( &self, envelope: &crate::KbWsTransactionResolutionEnvelope, ) -> Result { let request_option = self.try_build_request_from_ws_envelope(envelope); let request = match request_option { Some(request) => request, None => return Ok(crate::KbTransactionResolutionOutcome::Ignored), }; { let resolved_guard = self.resolved_signatures.lock().await; if resolved_guard.contains(request.signature.as_str()) { return Ok(crate::KbTransactionResolutionOutcome::Ignored); } } let outcome_result = self.resolve_request(&request).await; let outcome = match outcome_result { Ok(outcome) => outcome, Err(error) => { let signal_id_result = self.record_resolution_error_signal(&request, &error).await; let signal_id = match signal_id_result { Ok(signal_id) => signal_id, Err(signal_error) => return Err(signal_error), }; return Ok(crate::KbTransactionResolutionOutcome::ErrorSignaled { signature: request.signature.clone(), signal_id, }); } }; match &outcome { crate::KbTransactionResolutionOutcome::Resolved { signature, .. } => { let mut resolved_guard = self.resolved_signatures.lock().await; resolved_guard.insert(signature.clone()); } crate::KbTransactionResolutionOutcome::Missing { signature, .. } => { let mut resolved_guard = self.resolved_signatures.lock().await; resolved_guard.insert(signature.clone()); } crate::KbTransactionResolutionOutcome::Ignored => {} crate::KbTransactionResolutionOutcome::ErrorSignaled { .. } => {} } Ok(outcome) } async fn resolve_request( &self, request: &crate::KbTransactionResolutionRequest, ) -> Result { let config = Some(serde_json::json!({ "encoding": "jsonParsed", "maxSupportedTransactionVersion": 0 })); let transaction_value_result = self .http_pool .get_transaction_raw_for_role( self.http_role.as_str(), request.signature.clone(), config, ) .await; let transaction_value = match transaction_value_result { Ok(transaction_value) => transaction_value, Err(error) => return Err(error), }; if transaction_value.is_null() { let payload = serde_json::json!({ "status": "missing", "signature": request.signature.clone(), "triggerMethod": request.trigger_method.clone(), "sourceEndpointName": request.source_endpoint_name.clone(), "slotHint": request.slot_hint, "triggerPayload": request.trigger_payload.clone(), "transaction": serde_json::Value::Null }); let observation_id_result = self .persistence .record_observation(&crate::KbDetectionObservationInput::new( "http.transaction_resolution".to_string(), crate::KbObservationSourceKind::HttpRpc, request.source_endpoint_name.clone(), request.signature.clone(), request.slot_hint, payload.clone(), )) .await; let observation_id = match observation_id_result { Ok(observation_id) => observation_id, Err(error) => return Err(error), }; let signal_id_result = self .persistence .record_signal(&crate::KbDetectionSignalInput::new( "signal.transaction_resolution.missing".to_string(), crate::KbAnalysisSignalSeverity::Medium, request.signature.clone(), Some(observation_id), None, payload, )) .await; let signal_id = match signal_id_result { Ok(signal_id) => signal_id, Err(error) => return Err(error), }; return Ok(crate::KbTransactionResolutionOutcome::Missing { signature: request.signature.clone(), observation_id, signal_id, }); } let resolved_slot = transaction_value .get("slot") .and_then(serde_json::Value::as_u64) .or(request.slot_hint); let projection_result = self .transaction_model .persist_resolved_transaction( request.signature.as_str(), request.source_endpoint_name.clone(), &transaction_value, ) .await; let projected_transaction_id = match projection_result { Ok(projected_transaction_id) => projected_transaction_id, Err(error) => return Err(error), }; let decoded_events_result = self .dex_decode_service .decode_transaction_by_signature(request.signature.as_str()) .await; let decoded_events = match decoded_events_result { Ok(decoded_events) => decoded_events, Err(error) => return Err(error), }; let decoded_event_count = decoded_events.len(); let payload = serde_json::json!({ "status": "resolved", "signature": request.signature.clone(), "triggerMethod": request.trigger_method.clone(), "sourceEndpointName": request.source_endpoint_name.clone(), "slotHint": request.slot_hint, "projectedTransactionId": projected_transaction_id, "decodedEventCount": decoded_event_count, "triggerPayload": request.trigger_payload.clone(), "transaction": transaction_value }); let observation_id_result = self .persistence .record_observation(&crate::KbDetectionObservationInput::new( "http.transaction_resolution".to_string(), crate::KbObservationSourceKind::HttpRpc, request.source_endpoint_name.clone(), request.signature.clone(), resolved_slot, payload.clone(), )) .await; let observation_id = match observation_id_result { Ok(observation_id) => observation_id, Err(error) => return Err(error), }; let signal_id_result = self .persistence .record_signal(&crate::KbDetectionSignalInput::new( "signal.transaction_resolution.resolved".to_string(), crate::KbAnalysisSignalSeverity::Low, request.signature.clone(), Some(observation_id), None, payload, )) .await; let signal_id = match signal_id_result { Ok(signal_id) => signal_id, Err(error) => return Err(error), }; Ok(crate::KbTransactionResolutionOutcome::Resolved { signature: request.signature.clone(), observation_id, signal_id, }) } async fn record_resolution_error_signal( &self, request: &crate::KbTransactionResolutionRequest, error: &crate::KbError, ) -> Result { let payload = serde_json::json!({ "status": "error", "signature": request.signature.clone(), "triggerMethod": request.trigger_method.clone(), "sourceEndpointName": request.source_endpoint_name.clone(), "slotHint": request.slot_hint, "triggerPayload": request.trigger_payload.clone(), "error": error.to_string() }); self.persistence .record_signal(&crate::KbDetectionSignalInput::new( "signal.transaction_resolution.error".to_string(), crate::KbAnalysisSignalSeverity::High, request.signature.clone(), None, None, payload, )) .await } } /// Relay that consumes forwarded WS notifications and resolves matching /// signatures through HTTP `getTransaction`. #[derive(Debug, Clone)] pub struct KbWsTransactionResolutionRelay { resolver: crate::KbTransactionResolutionService, } impl KbWsTransactionResolutionRelay { /// Creates a new transaction resolution relay. pub fn new(resolver: crate::KbTransactionResolutionService) -> Self { Self { resolver } } /// Creates a bounded relay channel. pub fn channel( capacity: usize, ) -> ( tokio::sync::mpsc::Sender, tokio::sync::mpsc::Receiver, ) { tokio::sync::mpsc::channel(capacity) } /// Processes one forwarded envelope. pub async fn process_envelope( &self, envelope: &crate::KbWsTransactionResolutionEnvelope, ) -> Result { self.resolver.process_ws_envelope(envelope).await } /// Spawns one background relay worker. pub fn spawn( self, mut receiver: tokio::sync::mpsc::Receiver, ) -> tokio::task::JoinHandle { tokio::spawn(async move { let mut stats = crate::KbWsTransactionResolutionRelayStats::default(); loop { let recv_result = receiver.recv().await; let envelope = match recv_result { Some(envelope) => envelope, None => break, }; stats.received_count += 1; let outcome_result = self.process_envelope(&envelope).await; let outcome = match outcome_result { Ok(outcome) => outcome, Err(error) => { stats.error_count += 1; tracing::error!( target: "kb_lib::tx_resolution", "transaction resolution relay failed endpoint_name={:?}: {}", envelope.endpoint_name, error ); continue; } }; match outcome { crate::KbTransactionResolutionOutcome::Ignored => { stats.ignored_count += 1; } crate::KbTransactionResolutionOutcome::Resolved { .. } => { stats.resolved_count += 1; } crate::KbTransactionResolutionOutcome::Missing { .. } => { stats.missing_count += 1; } crate::KbTransactionResolutionOutcome::ErrorSignaled { .. } => { stats.error_count += 1; } } } stats }) } } fn kb_extract_resolution_signature( notification: &crate::KbJsonRpcWsNotification, subscription: std::option::Option<&crate::WsSubscriptionInfo>, ) -> std::option::Option { let result = ¬ification.params.result; if let Some(signature) = result.get("signature").and_then(serde_json::Value::as_str) { return Some(signature.to_string()); } if let Some(value) = result.get("value") { if let Some(signature) = value.get("signature").and_then(serde_json::Value::as_str) { return Some(signature.to_string()); } } if notification.method.as_str() == "signatureNotification" { if let Some(subscription) = subscription { if let Some(first_param) = subscription.params.first() { if let Some(signature) = first_param.as_str() { return Some(signature.to_string()); } } } } None } fn kb_extract_resolution_slot( notification: &crate::KbJsonRpcWsNotification, ) -> std::option::Option { let result = ¬ification.params.result; if let Some(slot) = result.get("slot").and_then(serde_json::Value::as_u64) { return Some(slot); } if let Some(context) = result.get("context") { if let Some(slot) = context.get("slot").and_then(serde_json::Value::as_u64) { return Some(slot); } } if let Some(value) = result.get("value") { if let Some(slot) = value.get("slot").and_then(serde_json::Value::as_u64) { return Some(slot); } } None }