// file: kb_lib/src/tx_resolution.rs //! Transaction resolution pipeline. //! //! This module bridges useful WebSocket notifications carrying a signature //! with HTTP `getTransaction` resolution through the existing endpoint pool. /// One transaction resolution request built from a WebSocket notification. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct TransactionResolutionRequest { /// Transaction signature to resolve. pub signature: std::string::String, /// Notification method that triggered the request. pub trigger_method: std::string::String, /// Optional source endpoint logical name. pub source_endpoint_name: std::option::Option, /// Optional slot hint extracted from the triggering notification. pub slot_hint: std::option::Option, /// Raw triggering notification payload. pub trigger_payload: serde_json::Value, } /// One forwarded WebSocket notification envelope for transaction resolution. #[derive(Debug, Clone)] pub struct WsTransactionResolutionEnvelope { /// Optional source endpoint logical name. pub endpoint_name: std::option::Option, /// Raw JSON-RPC notification. pub notification: crate::JsonRpcWsNotification, /// Optional matched subscription metadata. pub subscription: std::option::Option, } impl WsTransactionResolutionEnvelope { /// Creates a new transaction-resolution envelope. pub fn new( endpoint_name: std::option::Option, notification: crate::JsonRpcWsNotification, subscription: std::option::Option, ) -> Self { return Self { endpoint_name, notification, subscription, }; } } /// Result of one transaction resolution pass. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum TransactionResolutionOutcome { /// The notification does not produce a transaction resolution request. Ignored, /// The transaction was successfully resolved and persisted. Resolved { /// Signature that was resolved. signature: std::string::String, /// Persisted observation id. observation_id: i64, /// Persisted signal id. signal_id: i64, }, /// The transaction lookup returned `null`. Missing { /// Signature that was queried. signature: std::string::String, /// Persisted observation id. observation_id: i64, /// Persisted signal id. signal_id: i64, }, /// One HTTP resolution error was converted into a signal. ErrorSignaled { /// Signature that failed to resolve. signature: std::string::String, /// Persisted signal id. signal_id: i64, }, } /// Runtime statistics for one transaction resolution relay worker. #[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] pub struct WsTransactionResolutionRelayStats { /// Number of received envelopes. pub received_count: u64, /// Number of ignored envelopes. pub ignored_count: u64, /// Number of successfully resolved transactions. pub resolved_count: u64, /// Number of missing transactions. pub missing_count: u64, /// Number of error signals produced. pub error_count: u64, } /// Transaction resolution service. #[derive(Debug, Clone)] pub struct TransactionResolutionService { http_pool: std::sync::Arc, persistence: crate::DetectionPersistenceService, http_role: std::string::String, transaction_model: crate::TransactionModelService, dex_decode_service: crate::DexDecodeService, dex_detect_service: crate::DexDetectService, launch_origin_service: crate::LaunchOriginService, pool_origin_service: crate::PoolOriginService, wallet_observation_service: crate::WalletObservationService, non_trade_materialization_service: crate::NonTradeEventMaterializationService, trade_aggregation_service: crate::TradeAggregationService, wallet_holding_observation_service: crate::WalletHoldingObservationService, pair_candle_aggregation_service: crate::PairCandleAggregationService, pair_analytic_signal_service: crate::PairAnalyticSignalService, transaction_classification_service: crate::TransactionClassificationService, resolved_signatures: std::sync::Arc>>, } impl TransactionResolutionService { /// Creates a new transaction resolution service. pub fn new( http_pool: std::sync::Arc, database: std::sync::Arc, http_role: std::string::String, ) -> Self { let persistence = crate::DetectionPersistenceService::new(database.clone()); let transaction_model = crate::TransactionModelService::new(database.clone()); let dex_decode_service = crate::DexDecodeService::new(database.clone()); let dex_detect_service = crate::DexDetectService::new(database.clone()); let launch_origin_service = crate::LaunchOriginService::new(database.clone()); let pool_origin_service = crate::PoolOriginService::new(database.clone()); let wallet_observation_service = crate::WalletObservationService::new(database.clone()); let non_trade_materialization_service = crate::NonTradeEventMaterializationService::new(database.clone()); let trade_aggregation_service = crate::TradeAggregationService::new(database.clone()); let wallet_holding_observation_service = crate::WalletHoldingObservationService::new(database.clone()); let pair_candle_aggregation_service = crate::PairCandleAggregationService::new(database.clone()); let pair_analytic_signal_service = crate::PairAnalyticSignalService::new(database.clone()); let transaction_classification_service = crate::TransactionClassificationService::new(database.clone()); return Self { http_pool, persistence, http_role, transaction_model, dex_decode_service, dex_detect_service, launch_origin_service, pool_origin_service, wallet_observation_service, non_trade_materialization_service, trade_aggregation_service, wallet_holding_observation_service, pair_candle_aggregation_service, pair_analytic_signal_service, transaction_classification_service, resolved_signatures: std::sync::Arc::new(tokio::sync::Mutex::new( std::collections::HashSet::new(), )), }; } /// Returns the persistence façade used by the resolver. pub fn persistence(&self) -> &crate::DetectionPersistenceService { return &self.persistence; } /// Builds one transaction resolution request from one WS envelope. pub fn try_build_request_from_ws_envelope( &self, envelope: &crate::WsTransactionResolutionEnvelope, ) -> std::option::Option { let method = envelope.notification.method.as_str(); if method != "logsNotification" && method != "signatureNotification" && method != "programNotification" { return None; } let signature_option = extract_resolution_signature(&envelope.notification, envelope.subscription.as_ref()); let signature = match signature_option { Some(signature) => signature, None => return None, }; let slot_hint = extract_resolution_slot(&envelope.notification); return Some(crate::TransactionResolutionRequest { signature, trigger_method: envelope.notification.method.clone(), source_endpoint_name: envelope.endpoint_name.clone(), slot_hint, trigger_payload: serde_json::json!({ "jsonrpc": envelope.notification.jsonrpc, "method": envelope.notification.method, "subscription": envelope.notification.params.subscription, "result": envelope.notification.params.result, }), }); } /// Processes one forwarded WS envelope. pub async fn process_ws_envelope( &self, envelope: &crate::WsTransactionResolutionEnvelope, ) -> Result { let request_option = self.try_build_request_from_ws_envelope(envelope); let request = match request_option { Some(request) => request, None => return Ok(crate::TransactionResolutionOutcome::Ignored), }; { let resolved_guard = self.resolved_signatures.lock().await; if resolved_guard.contains(request.signature.as_str()) { return Ok(crate::TransactionResolutionOutcome::Ignored); } } let outcome_result = self.resolve_request(&request).await; let outcome = match outcome_result { Ok(outcome) => outcome, Err(error) => { let signal_id_result = self.record_resolution_error_signal(&request, &error).await; let signal_id = match signal_id_result { Ok(signal_id) => signal_id, Err(signal_error) => return Err(signal_error), }; return Ok(crate::TransactionResolutionOutcome::ErrorSignaled { signature: request.signature.clone(), signal_id, }); }, }; match &outcome { crate::TransactionResolutionOutcome::Resolved { signature, .. } => { let mut resolved_guard = self.resolved_signatures.lock().await; resolved_guard.insert(signature.clone()); }, crate::TransactionResolutionOutcome::Missing { signature, .. } => { let mut resolved_guard = self.resolved_signatures.lock().await; resolved_guard.insert(signature.clone()); }, crate::TransactionResolutionOutcome::Ignored => {}, crate::TransactionResolutionOutcome::ErrorSignaled { .. } => {}, } return Ok(outcome); } async fn resolve_request( &self, request: &crate::TransactionResolutionRequest, ) -> Result { let config = Some(serde_json::json!({ "encoding": "jsonParsed", "maxSupportedTransactionVersion": 0 })); let transaction_value_result = self .http_pool .get_transaction_raw_for_role( self.http_role.as_str(), request.signature.clone(), config, ) .await; let transaction_value = match transaction_value_result { Ok(transaction_value) => transaction_value, Err(error) => return Err(error), }; if transaction_value.is_null() { let payload = serde_json::json!({ "status": "missing", "signature": request.signature.clone(), "triggerMethod": request.trigger_method.clone(), "sourceEndpointName": request.source_endpoint_name.clone(), "slotHint": request.slot_hint, "triggerPayload": request.trigger_payload.clone(), "transaction": serde_json::Value::Null }); let observation_id_result = self .persistence .record_observation(&crate::DetectionObservationInput::new( "http.transaction_resolution".to_string(), crate::ObservationSourceKind::HttpRpc, request.source_endpoint_name.clone(), request.signature.clone(), request.slot_hint, payload.clone(), )) .await; let observation_id = match observation_id_result { Ok(observation_id) => observation_id, Err(error) => return Err(error), }; let signal_id_result = self .persistence .record_signal(&crate::DetectionSignalInput::new( "signal.transaction_resolution.missing".to_string(), crate::AnalysisSignalSeverity::Medium, request.signature.clone(), Some(observation_id), None, payload, )) .await; let signal_id = match signal_id_result { Ok(signal_id) => signal_id, Err(error) => return Err(error), }; return Ok(crate::TransactionResolutionOutcome::Missing { signature: request.signature.clone(), observation_id, signal_id, }); } let resolved_slot = transaction_value .get("slot") .and_then(serde_json::Value::as_u64) .or(request.slot_hint); let projection_result = self .transaction_model .persist_resolved_transaction( request.signature.as_str(), request.source_endpoint_name.clone(), &transaction_value, ) .await; let projected_transaction_id = match projection_result { Ok(projected_transaction_id) => projected_transaction_id, Err(error) => return Err(error), }; let decoded_events_result = self .dex_decode_service .decode_transaction_by_signature(request.signature.as_str()) .await; let decoded_events = match decoded_events_result { Ok(decoded_events) => decoded_events, Err(error) => return Err(error), }; let decoded_event_count = decoded_events.len(); let detection_results_result = self .dex_detect_service .detect_transaction_by_signature(request.signature.as_str()) .await; let detection_results = match detection_results_result { Ok(detection_results) => detection_results, Err(error) => return Err(error), }; let detected_object_count = detection_results.len(); let launch_attributions_result = self .launch_origin_service .attribute_transaction_by_signature(request.signature.as_str()) .await; let launch_attributions = match launch_attributions_result { Ok(launch_attributions) => launch_attributions, Err(error) => return Err(error), }; let launch_attribution_count = launch_attributions.len(); let pool_origins_result = self .pool_origin_service .record_transaction_by_signature(request.signature.as_str()) .await; let pool_origins = match pool_origins_result { Ok(pool_origins) => pool_origins, Err(error) => return Err(error), }; let pool_origin_count = pool_origins.len(); let wallet_observations_result = self .wallet_observation_service .record_transaction_by_signature(request.signature.as_str()) .await; let wallet_observations = match wallet_observations_result { Ok(wallet_observations) => wallet_observations, Err(error) => return Err(error), }; let wallet_participation_count = wallet_observations.len(); let wallet_holding_observations_result = self .wallet_holding_observation_service .record_transaction_by_signature(request.signature.as_str()) .await; let wallet_holding_observations = match wallet_holding_observations_result { Ok(wallet_holding_observations) => wallet_holding_observations, Err(error) => return Err(error), }; let wallet_holding_count = wallet_holding_observations.len(); let non_trade_materialization_result = self .non_trade_materialization_service .record_transaction_by_signature(request.signature.as_str()) .await; let non_trade_materialization = match non_trade_materialization_result { Ok(non_trade_materialization) => non_trade_materialization, Err(error) => return Err(error), }; let liquidity_event_count = non_trade_materialization.liquidity_event_count; let pool_lifecycle_event_count = non_trade_materialization.pool_lifecycle_event_count; let trade_aggregations_result = self .trade_aggregation_service .record_transaction_by_signature(request.signature.as_str()) .await; let trade_aggregations = match trade_aggregations_result { Ok(trade_aggregations) => trade_aggregations, Err(error) => return Err(error), }; let trade_event_count = trade_aggregations.len(); let pair_candle_aggregations_result = self .pair_candle_aggregation_service .record_transaction_by_signature(request.signature.as_str()) .await; let pair_candle_aggregations = match pair_candle_aggregations_result { Ok(pair_candle_aggregations) => pair_candle_aggregations, Err(error) => return Err(error), }; let pair_candle_count = pair_candle_aggregations.len(); let pair_analytic_signals_result = self .pair_analytic_signal_service .record_transaction_by_signature(request.signature.as_str()) .await; let pair_analytic_signals = match pair_analytic_signals_result { Ok(pair_analytic_signals) => pair_analytic_signals, Err(error) => return Err(error), }; let pair_analytic_signal_count = pair_analytic_signals.len(); let transaction_classification_result = self .transaction_classification_service .classify_transaction_by_signature(request.signature.as_str()) .await; let transaction_classification = match transaction_classification_result { Ok(transaction_classification) => transaction_classification, Err(error) => return Err(error), }; let transaction_classification_id = transaction_classification.id; let transaction_classification_kind = transaction_classification.classification_kind.clone(); let payload = serde_json::json!({ "status": "resolved", "signature": request.signature.clone(), "triggerMethod": request.trigger_method.clone(), "sourceEndpointName": request.source_endpoint_name.clone(), "triggerPayload": request.trigger_payload.clone(), "slotHint": request.slot_hint, "projectedTransactionId": projected_transaction_id, "decodedEventCount": decoded_event_count, "detectedObjectCount": detected_object_count, "launchAttributionCount": launch_attribution_count, "poolOriginCount": pool_origin_count, "walletParticipationCount": wallet_participation_count, "walletHoldingCount": wallet_holding_count, "liquidityEventCount": liquidity_event_count, "poolLifecycleEventCount": pool_lifecycle_event_count, "tradeEventCount": trade_event_count, "pairCandleCount": pair_candle_count, "pairAnalyticSignalCount": pair_analytic_signal_count, "transactionClassificationId": transaction_classification_id, "transactionClassificationKind": transaction_classification_kind, "transaction": transaction_value }); let observation_id_result = self .persistence .record_observation(&crate::DetectionObservationInput::new( "http.transaction_resolution".to_string(), crate::ObservationSourceKind::HttpRpc, request.source_endpoint_name.clone(), request.signature.clone(), resolved_slot, payload.clone(), )) .await; let observation_id = match observation_id_result { Ok(observation_id) => observation_id, Err(error) => return Err(error), }; let signal_id_result = self .persistence .record_signal(&crate::DetectionSignalInput::new( "signal.transaction_resolution.resolved".to_string(), crate::AnalysisSignalSeverity::Low, request.signature.clone(), Some(observation_id), None, payload, )) .await; let signal_id = match signal_id_result { Ok(signal_id) => signal_id, Err(error) => return Err(error), }; return Ok(crate::TransactionResolutionOutcome::Resolved { signature: request.signature.clone(), observation_id, signal_id, }); } async fn record_resolution_error_signal( &self, request: &crate::TransactionResolutionRequest, error: &crate::Error, ) -> Result { let payload = serde_json::json!({ "status": "error", "signature": request.signature.clone(), "triggerMethod": request.trigger_method.clone(), "sourceEndpointName": request.source_endpoint_name.clone(), "slotHint": request.slot_hint, "triggerPayload": request.trigger_payload.clone(), "error": error.to_string() }); return self .persistence .record_signal(&crate::DetectionSignalInput::new( "signal.transaction_resolution.error".to_string(), crate::AnalysisSignalSeverity::High, request.signature.clone(), None, None, payload, )) .await; } } /// Relay that consumes forwarded WS notifications and resolves matching /// signatures through HTTP `getTransaction`. #[derive(Debug, Clone)] pub struct WsTransactionResolutionRelay { resolver: crate::TransactionResolutionService, } impl WsTransactionResolutionRelay { /// Creates a new transaction resolution relay. pub fn new(resolver: crate::TransactionResolutionService) -> Self { return Self { resolver }; } /// Creates a bounded relay channel. pub fn channel( capacity: usize, ) -> ( tokio::sync::mpsc::Sender, tokio::sync::mpsc::Receiver, ) { return tokio::sync::mpsc::channel(capacity); } /// Processes one forwarded envelope. pub async fn process_envelope( &self, envelope: &crate::WsTransactionResolutionEnvelope, ) -> Result { return self.resolver.process_ws_envelope(envelope).await; } /// Spawns one background relay worker. pub fn spawn( self, mut receiver: tokio::sync::mpsc::Receiver, ) -> tokio::task::JoinHandle { return tokio::spawn(async move { let mut stats = crate::WsTransactionResolutionRelayStats::default(); loop { let recv_result = receiver.recv().await; let envelope = match recv_result { Some(envelope) => envelope, None => break, }; stats.received_count += 1; let outcome_result = self.process_envelope(&envelope).await; let outcome = match outcome_result { Ok(outcome) => outcome, Err(error) => { stats.error_count += 1; tracing::error!( target: "tx_resolution", "transaction resolution relay failed endpoint_name={:?}: {}", envelope.endpoint_name, error ); continue; }, }; match outcome { crate::TransactionResolutionOutcome::Ignored => { stats.ignored_count += 1; }, crate::TransactionResolutionOutcome::Resolved { .. } => { stats.resolved_count += 1; }, crate::TransactionResolutionOutcome::Missing { .. } => { stats.missing_count += 1; }, crate::TransactionResolutionOutcome::ErrorSignaled { .. } => { stats.error_count += 1; }, } } return stats; }); } } fn extract_resolution_signature( notification: &crate::JsonRpcWsNotification, subscription: std::option::Option<&crate::WsSubscriptionInfo>, ) -> std::option::Option { let result = ¬ification.params.result; if let Some(signature) = result.get("signature").and_then(serde_json::Value::as_str) { return Some(signature.to_string()); } if let Some(value) = result.get("value") { if let Some(signature) = value.get("signature").and_then(serde_json::Value::as_str) { return Some(signature.to_string()); } } if notification.method.as_str() == "signatureNotification" { if let Some(subscription) = subscription { if let Some(first_param) = subscription.params.first() { if let Some(signature) = first_param.as_str() { return Some(signature.to_string()); } } } } return None; } fn extract_resolution_slot( notification: &crate::JsonRpcWsNotification, ) -> std::option::Option { let result = ¬ification.params.result; if let Some(slot) = result.get("slot").and_then(serde_json::Value::as_u64) { return Some(slot); } if let Some(context) = result.get("context") { if let Some(slot) = context.get("slot").and_then(serde_json::Value::as_u64) { return Some(slot); } } if let Some(value) = result.get("value") { if let Some(slot) = value.get("slot").and_then(serde_json::Value::as_u64) { return Some(slot); } } return None; }