0.7.1
This commit is contained in:
@@ -261,6 +261,23 @@ impl HttpEndpointPool {
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Executes `getTransaction` through the pool and returns the raw result value.
|
||||
pub async fn get_transaction_raw_for_role(
|
||||
&self,
|
||||
required_role: &str,
|
||||
signature: std::string::String,
|
||||
config: std::option::Option<serde_json::Value>,
|
||||
) -> Result<serde_json::Value, crate::KbError> {
|
||||
let client_result = self
|
||||
.select_client_for_role_and_method(required_role, "getTransaction")
|
||||
.await;
|
||||
let client = match client_result {
|
||||
Ok(client) => client,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
client.get_transaction_raw(signature, config).await
|
||||
}
|
||||
}
|
||||
|
||||
fn kb_pool_build_optional_config_only_params(
|
||||
|
||||
@@ -21,6 +21,7 @@ mod http_client;
|
||||
mod http_pool;
|
||||
mod db;
|
||||
mod detect;
|
||||
mod tx_resolution;
|
||||
|
||||
pub use crate::constants::*;
|
||||
pub use crate::error::KbError;
|
||||
@@ -168,3 +169,9 @@ pub use crate::detect::KbWsDetectionRelay;
|
||||
pub use crate::detect::KbWsDetectionRelayStats;
|
||||
pub use crate::detect::KbDetectionPoolCandidateInput;
|
||||
pub use crate::detect::KbDetectionPoolCandidateResult;
|
||||
pub use crate::tx_resolution::KbTransactionResolutionOutcome;
|
||||
pub use crate::tx_resolution::KbTransactionResolutionRequest;
|
||||
pub use crate::tx_resolution::KbTransactionResolutionService;
|
||||
pub use crate::tx_resolution::KbWsTransactionResolutionEnvelope;
|
||||
pub use crate::tx_resolution::KbWsTransactionResolutionRelay;
|
||||
pub use crate::tx_resolution::KbWsTransactionResolutionRelayStats;
|
||||
|
||||
472
kb_lib/src/tx_resolution.rs
Normal file
472
kb_lib/src/tx_resolution.rs
Normal file
@@ -0,0 +1,472 @@
|
||||
// file: kb_lib/src/tx_resolution.rs
|
||||
|
||||
//! Transaction resolution pipeline.
|
||||
//!
|
||||
//! This module bridges useful WebSocket notifications carrying a signature
|
||||
//! with HTTP `getTransaction` resolution through the existing endpoint pool.
|
||||
|
||||
/// One transaction resolution request built from a WebSocket notification.
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct KbTransactionResolutionRequest {
|
||||
/// Transaction signature to resolve.
|
||||
pub signature: std::string::String,
|
||||
/// Notification method that triggered the request.
|
||||
pub trigger_method: std::string::String,
|
||||
/// Optional source endpoint logical name.
|
||||
pub source_endpoint_name: std::option::Option<std::string::String>,
|
||||
/// Optional slot hint extracted from the triggering notification.
|
||||
pub slot_hint: std::option::Option<u64>,
|
||||
/// Raw triggering notification payload.
|
||||
pub trigger_payload: serde_json::Value,
|
||||
}
|
||||
|
||||
/// One forwarded WebSocket notification envelope for transaction resolution.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct KbWsTransactionResolutionEnvelope {
|
||||
/// Optional source endpoint logical name.
|
||||
pub endpoint_name: std::option::Option<std::string::String>,
|
||||
/// Raw JSON-RPC notification.
|
||||
pub notification: crate::KbJsonRpcWsNotification,
|
||||
/// Optional matched subscription metadata.
|
||||
pub subscription: std::option::Option<crate::WsSubscriptionInfo>,
|
||||
}
|
||||
|
||||
impl KbWsTransactionResolutionEnvelope {
|
||||
/// Creates a new transaction-resolution envelope.
|
||||
pub fn new(
|
||||
endpoint_name: std::option::Option<std::string::String>,
|
||||
notification: crate::KbJsonRpcWsNotification,
|
||||
subscription: std::option::Option<crate::WsSubscriptionInfo>,
|
||||
) -> Self {
|
||||
Self {
|
||||
endpoint_name,
|
||||
notification,
|
||||
subscription,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of one transaction resolution pass.
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub enum KbTransactionResolutionOutcome {
|
||||
/// The notification does not produce a transaction resolution request.
|
||||
Ignored,
|
||||
/// The transaction was successfully resolved and persisted.
|
||||
Resolved {
|
||||
/// Signature that was resolved.
|
||||
signature: std::string::String,
|
||||
/// Persisted observation id.
|
||||
observation_id: i64,
|
||||
/// Persisted signal id.
|
||||
signal_id: i64,
|
||||
},
|
||||
/// The transaction lookup returned `null`.
|
||||
Missing {
|
||||
/// Signature that was queried.
|
||||
signature: std::string::String,
|
||||
/// Persisted observation id.
|
||||
observation_id: i64,
|
||||
/// Persisted signal id.
|
||||
signal_id: i64,
|
||||
},
|
||||
/// One HTTP resolution error was converted into a signal.
|
||||
ErrorSignaled {
|
||||
/// Signature that failed to resolve.
|
||||
signature: std::string::String,
|
||||
/// Persisted signal id.
|
||||
signal_id: i64,
|
||||
},
|
||||
}
|
||||
|
||||
/// Runtime statistics for one transaction resolution relay worker.
|
||||
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
|
||||
pub struct KbWsTransactionResolutionRelayStats {
|
||||
/// Number of received envelopes.
|
||||
pub received_count: u64,
|
||||
/// Number of ignored envelopes.
|
||||
pub ignored_count: u64,
|
||||
/// Number of successfully resolved transactions.
|
||||
pub resolved_count: u64,
|
||||
/// Number of missing transactions.
|
||||
pub missing_count: u64,
|
||||
/// Number of error signals produced.
|
||||
pub error_count: u64,
|
||||
}
|
||||
|
||||
/// Transaction resolution service.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct KbTransactionResolutionService {
|
||||
http_pool: std::sync::Arc<crate::HttpEndpointPool>,
|
||||
persistence: crate::KbDetectionPersistenceService,
|
||||
http_role: std::string::String,
|
||||
resolved_signatures:
|
||||
std::sync::Arc<tokio::sync::Mutex<std::collections::HashSet<std::string::String>>>,
|
||||
}
|
||||
|
||||
impl KbTransactionResolutionService {
|
||||
/// Creates a new transaction resolution service.
|
||||
pub fn new(
|
||||
http_pool: std::sync::Arc<crate::HttpEndpointPool>,
|
||||
persistence: crate::KbDetectionPersistenceService,
|
||||
http_role: std::string::String,
|
||||
) -> Self {
|
||||
Self {
|
||||
http_pool,
|
||||
persistence,
|
||||
http_role,
|
||||
resolved_signatures: std::sync::Arc::new(tokio::sync::Mutex::new(
|
||||
std::collections::HashSet::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the persistence façade used by the resolver.
|
||||
pub fn persistence(&self) -> &crate::KbDetectionPersistenceService {
|
||||
&self.persistence
|
||||
}
|
||||
|
||||
/// Builds one transaction resolution request from one WS envelope.
|
||||
pub fn try_build_request_from_ws_envelope(
|
||||
&self,
|
||||
envelope: &crate::KbWsTransactionResolutionEnvelope,
|
||||
) -> std::option::Option<crate::KbTransactionResolutionRequest> {
|
||||
let method = envelope.notification.method.as_str();
|
||||
if method != "logsNotification"
|
||||
&& method != "signatureNotification"
|
||||
&& method != "programNotification"
|
||||
{
|
||||
return None;
|
||||
}
|
||||
let signature_option =
|
||||
kb_extract_resolution_signature(&envelope.notification, envelope.subscription.as_ref());
|
||||
let signature = match signature_option {
|
||||
Some(signature) => signature,
|
||||
None => return None,
|
||||
};
|
||||
let slot_hint = kb_extract_resolution_slot(&envelope.notification);
|
||||
Some(crate::KbTransactionResolutionRequest {
|
||||
signature,
|
||||
trigger_method: envelope.notification.method.clone(),
|
||||
source_endpoint_name: envelope.endpoint_name.clone(),
|
||||
slot_hint,
|
||||
trigger_payload: serde_json::json!({
|
||||
"jsonrpc": envelope.notification.jsonrpc,
|
||||
"method": envelope.notification.method,
|
||||
"subscription": envelope.notification.params.subscription,
|
||||
"result": envelope.notification.params.result,
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
/// Processes one forwarded WS envelope.
|
||||
pub async fn process_ws_envelope(
|
||||
&self,
|
||||
envelope: &crate::KbWsTransactionResolutionEnvelope,
|
||||
) -> Result<crate::KbTransactionResolutionOutcome, crate::KbError> {
|
||||
let request_option = self.try_build_request_from_ws_envelope(envelope);
|
||||
let request = match request_option {
|
||||
Some(request) => request,
|
||||
None => return Ok(crate::KbTransactionResolutionOutcome::Ignored),
|
||||
};
|
||||
{
|
||||
let resolved_guard = self.resolved_signatures.lock().await;
|
||||
if resolved_guard.contains(request.signature.as_str()) {
|
||||
return Ok(crate::KbTransactionResolutionOutcome::Ignored);
|
||||
}
|
||||
}
|
||||
let outcome_result = self.resolve_request(&request).await;
|
||||
let outcome = match outcome_result {
|
||||
Ok(outcome) => outcome,
|
||||
Err(error) => {
|
||||
let signal_id_result = self.record_resolution_error_signal(&request, &error).await;
|
||||
let signal_id = match signal_id_result {
|
||||
Ok(signal_id) => signal_id,
|
||||
Err(signal_error) => return Err(signal_error),
|
||||
};
|
||||
return Ok(crate::KbTransactionResolutionOutcome::ErrorSignaled {
|
||||
signature: request.signature.clone(),
|
||||
signal_id,
|
||||
});
|
||||
}
|
||||
};
|
||||
match &outcome {
|
||||
crate::KbTransactionResolutionOutcome::Resolved { signature, .. } => {
|
||||
let mut resolved_guard = self.resolved_signatures.lock().await;
|
||||
resolved_guard.insert(signature.clone());
|
||||
}
|
||||
crate::KbTransactionResolutionOutcome::Missing { signature, .. } => {
|
||||
let mut resolved_guard = self.resolved_signatures.lock().await;
|
||||
resolved_guard.insert(signature.clone());
|
||||
}
|
||||
crate::KbTransactionResolutionOutcome::Ignored => {}
|
||||
crate::KbTransactionResolutionOutcome::ErrorSignaled { .. } => {}
|
||||
}
|
||||
Ok(outcome)
|
||||
}
|
||||
|
||||
async fn resolve_request(
|
||||
&self,
|
||||
request: &crate::KbTransactionResolutionRequest,
|
||||
) -> Result<crate::KbTransactionResolutionOutcome, crate::KbError> {
|
||||
let config = Some(serde_json::json!({
|
||||
"encoding": "jsonParsed",
|
||||
"maxSupportedTransactionVersion": 0
|
||||
}));
|
||||
let transaction_value_result = self
|
||||
.http_pool
|
||||
.get_transaction_raw_for_role(
|
||||
self.http_role.as_str(),
|
||||
request.signature.clone(),
|
||||
config,
|
||||
)
|
||||
.await;
|
||||
let transaction_value = match transaction_value_result {
|
||||
Ok(transaction_value) => transaction_value,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
if transaction_value.is_null() {
|
||||
let payload = serde_json::json!({
|
||||
"status": "missing",
|
||||
"signature": request.signature.clone(),
|
||||
"triggerMethod": request.trigger_method.clone(),
|
||||
"sourceEndpointName": request.source_endpoint_name.clone(),
|
||||
"slotHint": request.slot_hint,
|
||||
"triggerPayload": request.trigger_payload.clone(),
|
||||
"transaction": serde_json::Value::Null
|
||||
});
|
||||
let observation_id_result = self
|
||||
.persistence
|
||||
.record_observation(&crate::KbDetectionObservationInput::new(
|
||||
"http.transaction_resolution".to_string(),
|
||||
crate::KbObservationSourceKind::HttpRpc,
|
||||
request.source_endpoint_name.clone(),
|
||||
request.signature.clone(),
|
||||
request.slot_hint,
|
||||
payload.clone(),
|
||||
))
|
||||
.await;
|
||||
let observation_id = match observation_id_result {
|
||||
Ok(observation_id) => observation_id,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
let signal_id_result = self
|
||||
.persistence
|
||||
.record_signal(&crate::KbDetectionSignalInput::new(
|
||||
"signal.transaction_resolution.missing".to_string(),
|
||||
crate::KbAnalysisSignalSeverity::Medium,
|
||||
request.signature.clone(),
|
||||
Some(observation_id),
|
||||
None,
|
||||
payload,
|
||||
))
|
||||
.await;
|
||||
let signal_id = match signal_id_result {
|
||||
Ok(signal_id) => signal_id,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
return Ok(crate::KbTransactionResolutionOutcome::Missing {
|
||||
signature: request.signature.clone(),
|
||||
observation_id,
|
||||
signal_id,
|
||||
});
|
||||
}
|
||||
let resolved_slot = transaction_value
|
||||
.get("slot")
|
||||
.and_then(serde_json::Value::as_u64)
|
||||
.or(request.slot_hint);
|
||||
let payload = serde_json::json!({
|
||||
"status": "resolved",
|
||||
"signature": request.signature.clone(),
|
||||
"triggerMethod": request.trigger_method.clone(),
|
||||
"sourceEndpointName": request.source_endpoint_name.clone(),
|
||||
"slotHint": request.slot_hint,
|
||||
"triggerPayload": request.trigger_payload.clone(),
|
||||
"transaction": transaction_value
|
||||
});
|
||||
let observation_id_result = self
|
||||
.persistence
|
||||
.record_observation(&crate::KbDetectionObservationInput::new(
|
||||
"http.transaction_resolution".to_string(),
|
||||
crate::KbObservationSourceKind::HttpRpc,
|
||||
request.source_endpoint_name.clone(),
|
||||
request.signature.clone(),
|
||||
resolved_slot,
|
||||
payload.clone(),
|
||||
))
|
||||
.await;
|
||||
let observation_id = match observation_id_result {
|
||||
Ok(observation_id) => observation_id,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
let signal_id_result = self
|
||||
.persistence
|
||||
.record_signal(&crate::KbDetectionSignalInput::new(
|
||||
"signal.transaction_resolution.resolved".to_string(),
|
||||
crate::KbAnalysisSignalSeverity::Low,
|
||||
request.signature.clone(),
|
||||
Some(observation_id),
|
||||
None,
|
||||
payload,
|
||||
))
|
||||
.await;
|
||||
let signal_id = match signal_id_result {
|
||||
Ok(signal_id) => signal_id,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
Ok(crate::KbTransactionResolutionOutcome::Resolved {
|
||||
signature: request.signature.clone(),
|
||||
observation_id,
|
||||
signal_id,
|
||||
})
|
||||
}
|
||||
|
||||
async fn record_resolution_error_signal(
|
||||
&self,
|
||||
request: &crate::KbTransactionResolutionRequest,
|
||||
error: &crate::KbError,
|
||||
) -> Result<i64, crate::KbError> {
|
||||
let payload = serde_json::json!({
|
||||
"status": "error",
|
||||
"signature": request.signature.clone(),
|
||||
"triggerMethod": request.trigger_method.clone(),
|
||||
"sourceEndpointName": request.source_endpoint_name.clone(),
|
||||
"slotHint": request.slot_hint,
|
||||
"triggerPayload": request.trigger_payload.clone(),
|
||||
"error": error.to_string()
|
||||
});
|
||||
self.persistence
|
||||
.record_signal(&crate::KbDetectionSignalInput::new(
|
||||
"signal.transaction_resolution.error".to_string(),
|
||||
crate::KbAnalysisSignalSeverity::High,
|
||||
request.signature.clone(),
|
||||
None,
|
||||
None,
|
||||
payload,
|
||||
))
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Relay that consumes forwarded WS notifications and resolves matching
|
||||
/// signatures through HTTP `getTransaction`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct KbWsTransactionResolutionRelay {
|
||||
resolver: crate::KbTransactionResolutionService,
|
||||
}
|
||||
|
||||
impl KbWsTransactionResolutionRelay {
|
||||
/// Creates a new transaction resolution relay.
|
||||
pub fn new(resolver: crate::KbTransactionResolutionService) -> Self {
|
||||
Self { resolver }
|
||||
}
|
||||
|
||||
/// Creates a bounded relay channel.
|
||||
pub fn channel(
|
||||
capacity: usize,
|
||||
) -> (
|
||||
tokio::sync::mpsc::Sender<crate::KbWsTransactionResolutionEnvelope>,
|
||||
tokio::sync::mpsc::Receiver<crate::KbWsTransactionResolutionEnvelope>,
|
||||
) {
|
||||
tokio::sync::mpsc::channel(capacity)
|
||||
}
|
||||
|
||||
/// Processes one forwarded envelope.
|
||||
pub async fn process_envelope(
|
||||
&self,
|
||||
envelope: &crate::KbWsTransactionResolutionEnvelope,
|
||||
) -> Result<crate::KbTransactionResolutionOutcome, crate::KbError> {
|
||||
self.resolver.process_ws_envelope(envelope).await
|
||||
}
|
||||
|
||||
/// Spawns one background relay worker.
|
||||
pub fn spawn(
|
||||
self,
|
||||
mut receiver: tokio::sync::mpsc::Receiver<crate::KbWsTransactionResolutionEnvelope>,
|
||||
) -> tokio::task::JoinHandle<crate::KbWsTransactionResolutionRelayStats> {
|
||||
tokio::spawn(async move {
|
||||
let mut stats = crate::KbWsTransactionResolutionRelayStats::default();
|
||||
loop {
|
||||
let recv_result = receiver.recv().await;
|
||||
let envelope = match recv_result {
|
||||
Some(envelope) => envelope,
|
||||
None => break,
|
||||
};
|
||||
stats.received_count += 1;
|
||||
let outcome_result = self.process_envelope(&envelope).await;
|
||||
let outcome = match outcome_result {
|
||||
Ok(outcome) => outcome,
|
||||
Err(error) => {
|
||||
stats.error_count += 1;
|
||||
tracing::error!(
|
||||
target: "kb_lib::tx_resolution",
|
||||
"transaction resolution relay failed endpoint_name={:?}: {}",
|
||||
envelope.endpoint_name,
|
||||
error
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match outcome {
|
||||
crate::KbTransactionResolutionOutcome::Ignored => {
|
||||
stats.ignored_count += 1;
|
||||
}
|
||||
crate::KbTransactionResolutionOutcome::Resolved { .. } => {
|
||||
stats.resolved_count += 1;
|
||||
}
|
||||
crate::KbTransactionResolutionOutcome::Missing { .. } => {
|
||||
stats.missing_count += 1;
|
||||
}
|
||||
crate::KbTransactionResolutionOutcome::ErrorSignaled { .. } => {
|
||||
stats.error_count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
stats
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn kb_extract_resolution_signature(
|
||||
notification: &crate::KbJsonRpcWsNotification,
|
||||
subscription: std::option::Option<&crate::WsSubscriptionInfo>,
|
||||
) -> std::option::Option<std::string::String> {
|
||||
let result = ¬ification.params.result;
|
||||
if let Some(signature) = result.get("signature").and_then(serde_json::Value::as_str) {
|
||||
return Some(signature.to_string());
|
||||
}
|
||||
if let Some(value) = result.get("value") {
|
||||
if let Some(signature) = value.get("signature").and_then(serde_json::Value::as_str) {
|
||||
return Some(signature.to_string());
|
||||
}
|
||||
}
|
||||
if notification.method.as_str() == "signatureNotification" {
|
||||
if let Some(subscription) = subscription {
|
||||
if let Some(first_param) = subscription.params.first() {
|
||||
if let Some(signature) = first_param.as_str() {
|
||||
return Some(signature.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn kb_extract_resolution_slot(
|
||||
notification: &crate::KbJsonRpcWsNotification,
|
||||
) -> std::option::Option<u64> {
|
||||
let result = ¬ification.params.result;
|
||||
if let Some(slot) = result.get("slot").and_then(serde_json::Value::as_u64) {
|
||||
return Some(slot);
|
||||
}
|
||||
if let Some(context) = result.get("context") {
|
||||
if let Some(slot) = context.get("slot").and_then(serde_json::Value::as_u64) {
|
||||
return Some(slot);
|
||||
}
|
||||
}
|
||||
if let Some(value) = result.get("value") {
|
||||
if let Some(slot) = value.get("slot").and_then(serde_json::Value::as_u64) {
|
||||
return Some(slot);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
@@ -240,6 +240,13 @@ pub struct WsClient {
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
transaction_resolution_notification_forwarder: std::sync::Arc<
|
||||
tokio::sync::Mutex<
|
||||
std::option::Option<
|
||||
tokio::sync::mpsc::Sender<crate::KbWsTransactionResolutionEnvelope>,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
}
|
||||
|
||||
impl WsClient {
|
||||
@@ -267,6 +274,9 @@ impl WsClient {
|
||||
runtime: std::sync::Arc::new(tokio::sync::Mutex::new(WsClientRuntime::new())),
|
||||
registry: std::sync::Arc::new(tokio::sync::Mutex::new(WsClientRegistry::new())),
|
||||
detection_notification_forwarder: std::sync::Arc::new(tokio::sync::Mutex::new(None)),
|
||||
transaction_resolution_notification_forwarder: std::sync::Arc::new(
|
||||
tokio::sync::Mutex::new(None),
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -382,7 +392,6 @@ impl WsClient {
|
||||
return Err(error);
|
||||
}
|
||||
};
|
||||
|
||||
let (ws_stream, _response) = match connect_result {
|
||||
Ok(parts) => parts,
|
||||
Err(error) => {
|
||||
@@ -1158,6 +1167,7 @@ impl WsClient {
|
||||
.get(&subscription_id)
|
||||
.cloned()
|
||||
};
|
||||
let resolution_subscription = matched_subscription_option.clone();
|
||||
match matched_subscription_option {
|
||||
Some(subscription) => {
|
||||
let method_matches_registry =
|
||||
@@ -1182,6 +1192,13 @@ impl WsClient {
|
||||
notification,
|
||||
)
|
||||
.await;
|
||||
forward_transaction_resolution_notification_if_configured(
|
||||
&self.transaction_resolution_notification_forwarder,
|
||||
self.endpoint.name.as_str(),
|
||||
notification,
|
||||
resolution_subscription,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1365,6 +1382,27 @@ impl WsClient {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the optional transaction-resolution notification forwarder.
|
||||
pub async fn set_transaction_resolution_notification_forwarder(
|
||||
&self,
|
||||
sender: tokio::sync::mpsc::Sender<crate::KbWsTransactionResolutionEnvelope>,
|
||||
) {
|
||||
let mut guard = self
|
||||
.transaction_resolution_notification_forwarder
|
||||
.lock()
|
||||
.await;
|
||||
*guard = Some(sender);
|
||||
}
|
||||
|
||||
/// Clears the optional transaction-resolution notification forwarder.
|
||||
pub async fn clear_transaction_resolution_notification_forwarder(&self) {
|
||||
let mut guard = self
|
||||
.transaction_resolution_notification_forwarder
|
||||
.lock()
|
||||
.await;
|
||||
*guard = None;
|
||||
}
|
||||
}
|
||||
|
||||
fn kb_convert_outgoing_message(
|
||||
@@ -1528,6 +1566,51 @@ async fn forward_detection_notification_if_configured(
|
||||
}
|
||||
}
|
||||
|
||||
async fn forward_transaction_resolution_notification_if_configured(
|
||||
forwarder: &std::sync::Arc<
|
||||
tokio::sync::Mutex<
|
||||
std::option::Option<
|
||||
tokio::sync::mpsc::Sender<crate::KbWsTransactionResolutionEnvelope>,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
endpoint_name: &str,
|
||||
notification: &crate::KbJsonRpcWsNotification,
|
||||
subscription: std::option::Option<crate::WsSubscriptionInfo>,
|
||||
) {
|
||||
let sender_option = {
|
||||
let guard = forwarder.lock().await;
|
||||
guard.clone()
|
||||
};
|
||||
let sender = match sender_option {
|
||||
Some(sender) => sender,
|
||||
None => return,
|
||||
};
|
||||
let envelope = crate::KbWsTransactionResolutionEnvelope::new(
|
||||
Some(endpoint_name.to_string()),
|
||||
notification.clone(),
|
||||
subscription,
|
||||
);
|
||||
let send_result = sender.try_send(envelope);
|
||||
match send_result {
|
||||
Ok(()) => {}
|
||||
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
|
||||
tracing::warn!(
|
||||
target: "kb_lib::ws_client",
|
||||
"transaction resolution relay queue is full endpoint_name={}",
|
||||
endpoint_name
|
||||
);
|
||||
}
|
||||
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
|
||||
tracing::warn!(
|
||||
target: "kb_lib::ws_client",
|
||||
"transaction resolution relay queue is closed endpoint_name={}",
|
||||
endpoint_name
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures_util::SinkExt;
|
||||
|
||||
@@ -48,6 +48,11 @@ pub struct WsManager {
|
||||
std::option::Option<tokio::sync::mpsc::Sender<crate::KbWsDetectionNotificationEnvelope>>,
|
||||
>,
|
||||
detection_relay_abort_handle: tokio::sync::Mutex<std::option::Option<tokio::task::AbortHandle>>,
|
||||
transaction_resolution_relay_sender: tokio::sync::Mutex<
|
||||
std::option::Option<tokio::sync::mpsc::Sender<crate::KbWsTransactionResolutionEnvelope>>,
|
||||
>,
|
||||
transaction_resolution_relay_abort_handle:
|
||||
tokio::sync::Mutex<std::option::Option<tokio::task::AbortHandle>>,
|
||||
}
|
||||
|
||||
impl WsManager {
|
||||
@@ -97,6 +102,8 @@ impl WsManager {
|
||||
event_tx,
|
||||
detection_relay_sender: tokio::sync::Mutex::new(None),
|
||||
detection_relay_abort_handle: tokio::sync::Mutex::new(None),
|
||||
transaction_resolution_relay_sender: tokio::sync::Mutex::new(None),
|
||||
transaction_resolution_relay_abort_handle: tokio::sync::Mutex::new(None),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -202,6 +209,15 @@ impl WsManager {
|
||||
if let Some(sender) = sender_option {
|
||||
client.set_detection_notification_forwarder(sender).await;
|
||||
}
|
||||
let tx_resolution_sender_option = {
|
||||
let sender_guard = self.transaction_resolution_relay_sender.lock().await;
|
||||
sender_guard.clone()
|
||||
};
|
||||
if let Some(sender) = tx_resolution_sender_option {
|
||||
client
|
||||
.set_transaction_resolution_notification_forwarder(sender)
|
||||
.await;
|
||||
}
|
||||
let connect_result = client.connect().await;
|
||||
if let Err(error) = connect_result {
|
||||
return Err(error);
|
||||
@@ -236,6 +252,9 @@ impl WsManager {
|
||||
return Ok(false);
|
||||
}
|
||||
client.clear_detection_notification_forwarder().await;
|
||||
client
|
||||
.clear_transaction_resolution_notification_forwarder()
|
||||
.await;
|
||||
let disconnect_result = client.disconnect().await;
|
||||
if let Err(error) = disconnect_result {
|
||||
return Err(error);
|
||||
@@ -433,6 +452,82 @@ impl WsManager {
|
||||
};
|
||||
Ok(client.connection_state().await)
|
||||
}
|
||||
|
||||
/// Attaches one shared transaction-resolution relay to all managed clients.
|
||||
pub async fn attach_transaction_resolution_relay(
|
||||
&self,
|
||||
database: std::sync::Arc<crate::KbDatabase>,
|
||||
http_pool: std::sync::Arc<crate::HttpEndpointPool>,
|
||||
http_role: std::string::String,
|
||||
queue_capacity: usize,
|
||||
) -> Result<(), crate::KbError> {
|
||||
{
|
||||
let sender_guard = self.transaction_resolution_relay_sender.lock().await;
|
||||
if sender_guard.is_some() {
|
||||
return Err(crate::KbError::InvalidState(
|
||||
"websocket transaction resolution relay is already attached".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
let persistence = crate::KbDetectionPersistenceService::new(database);
|
||||
let resolver =
|
||||
crate::KbTransactionResolutionService::new(http_pool, persistence, http_role);
|
||||
let relay = crate::KbWsTransactionResolutionRelay::new(resolver);
|
||||
let (sender, receiver) = crate::KbWsTransactionResolutionRelay::channel(queue_capacity);
|
||||
let relay_task = relay.spawn(receiver);
|
||||
let relay_abort_handle = relay_task.abort_handle();
|
||||
{
|
||||
let mut sender_guard = self.transaction_resolution_relay_sender.lock().await;
|
||||
*sender_guard = Some(sender.clone());
|
||||
}
|
||||
{
|
||||
let mut abort_guard = self.transaction_resolution_relay_abort_handle.lock().await;
|
||||
*abort_guard = Some(relay_abort_handle);
|
||||
}
|
||||
let clients = {
|
||||
let clients_guard = self.clients.lock().await;
|
||||
let mut values = std::vec::Vec::new();
|
||||
for managed in clients_guard.values() {
|
||||
values.push(managed.client.clone());
|
||||
}
|
||||
values
|
||||
};
|
||||
for client in clients {
|
||||
client
|
||||
.set_transaction_resolution_notification_forwarder(sender.clone())
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Detaches the shared transaction-resolution relay from all managed clients.
|
||||
pub async fn detach_transaction_resolution_relay(&self) -> Result<(), crate::KbError> {
|
||||
let clients = {
|
||||
let clients_guard = self.clients.lock().await;
|
||||
let mut values = std::vec::Vec::new();
|
||||
for managed in clients_guard.values() {
|
||||
values.push(managed.client.clone());
|
||||
}
|
||||
values
|
||||
};
|
||||
for client in clients {
|
||||
client
|
||||
.clear_transaction_resolution_notification_forwarder()
|
||||
.await;
|
||||
}
|
||||
{
|
||||
let mut sender_guard = self.transaction_resolution_relay_sender.lock().await;
|
||||
*sender_guard = None;
|
||||
}
|
||||
let abort_handle_option = {
|
||||
let mut abort_guard = self.transaction_resolution_relay_abort_handle.lock().await;
|
||||
abort_guard.take()
|
||||
};
|
||||
if let Some(abort_handle) = abort_handle_option {
|
||||
abort_handle.abort();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WsManager {
|
||||
@@ -449,6 +544,12 @@ impl Drop for WsManager {
|
||||
abort_handle.abort();
|
||||
}
|
||||
}
|
||||
let tx_resolution_abort_result = self.transaction_resolution_relay_abort_handle.try_lock();
|
||||
if let Ok(mut abort_guard) = tx_resolution_abort_result {
|
||||
if let Some(abort_handle) = abort_guard.take() {
|
||||
abort_handle.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user