0.6.2
This commit is contained in:
210
kb_lib/src/detect/ws_relay.rs
Normal file
210
kb_lib/src/detect/ws_relay.rs
Normal file
@@ -0,0 +1,210 @@
|
||||
// file: kb_lib/src/detect/ws_relay.rs
|
||||
|
||||
//! WebSocket detection relay.
|
||||
//!
|
||||
//! This module receives normalized WebSocket JSON-RPC notifications from the
|
||||
//! transport layer and forwards them to the Solana WS detection service.
|
||||
|
||||
/// One forwarded WebSocket notification ready for detection.
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct KbWsDetectionNotificationEnvelope {
|
||||
/// Optional logical endpoint name.
|
||||
pub endpoint_name: std::option::Option<std::string::String>,
|
||||
/// The parsed JSON-RPC notification.
|
||||
pub notification: crate::KbJsonRpcWsNotification,
|
||||
}
|
||||
|
||||
impl KbWsDetectionNotificationEnvelope {
|
||||
/// Creates a new notification envelope.
|
||||
pub fn new(
|
||||
endpoint_name: std::option::Option<std::string::String>,
|
||||
notification: crate::KbJsonRpcWsNotification,
|
||||
) -> Self {
|
||||
Self {
|
||||
endpoint_name,
|
||||
notification,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Runtime statistics for one relay worker.
|
||||
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
|
||||
pub struct KbWsDetectionRelayStats {
|
||||
/// Number of received envelopes.
|
||||
pub received_count: u64,
|
||||
/// Number of ignored notifications.
|
||||
pub ignored_count: u64,
|
||||
/// Number of stored observations.
|
||||
pub observation_count: u64,
|
||||
/// Number of registered token candidates.
|
||||
pub token_candidate_count: u64,
|
||||
/// Number of processing errors.
|
||||
pub error_count: u64,
|
||||
}
|
||||
|
||||
/// Asynchronous relay between `WsClient` notifications and the detection service.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct KbWsDetectionRelay {
|
||||
/// Solana WS detection service.
|
||||
detector: crate::KbSolanaWsDetectionService,
|
||||
}
|
||||
|
||||
impl KbWsDetectionRelay {
|
||||
/// Creates a new relay.
|
||||
pub fn new(detector: crate::KbSolanaWsDetectionService) -> Self {
|
||||
Self { detector }
|
||||
}
|
||||
|
||||
/// Creates a bounded relay channel.
|
||||
pub fn channel(
|
||||
capacity: usize,
|
||||
) -> (
|
||||
tokio::sync::mpsc::Sender<crate::KbWsDetectionNotificationEnvelope>,
|
||||
tokio::sync::mpsc::Receiver<crate::KbWsDetectionNotificationEnvelope>,
|
||||
) {
|
||||
tokio::sync::mpsc::channel(capacity)
|
||||
}
|
||||
|
||||
/// Processes one forwarded notification.
|
||||
pub async fn process_envelope(
|
||||
&self,
|
||||
envelope: &crate::KbWsDetectionNotificationEnvelope,
|
||||
) -> Result<crate::KbSolanaWsDetectionOutcome, crate::KbError> {
|
||||
self.detector
|
||||
.process_notification(envelope.endpoint_name.clone(), &envelope.notification)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Spawns one background relay worker.
|
||||
pub fn spawn(
|
||||
self,
|
||||
mut receiver: tokio::sync::mpsc::Receiver<crate::KbWsDetectionNotificationEnvelope>,
|
||||
) -> tokio::task::JoinHandle<crate::KbWsDetectionRelayStats> {
|
||||
tokio::spawn(async move {
|
||||
let mut stats = crate::KbWsDetectionRelayStats::default();
|
||||
loop {
|
||||
let recv_result = receiver.recv().await;
|
||||
let envelope = match recv_result {
|
||||
Some(envelope) => envelope,
|
||||
None => break,
|
||||
};
|
||||
stats.received_count += 1;
|
||||
let outcome_result = self.process_envelope(&envelope).await;
|
||||
let outcome = match outcome_result {
|
||||
Ok(outcome) => outcome,
|
||||
Err(error) => {
|
||||
stats.error_count += 1;
|
||||
tracing::error!(
|
||||
target: "kb_lib::detect::ws_relay",
|
||||
"ws detection relay processing failed endpoint_name={:?}: {}",
|
||||
envelope.endpoint_name,
|
||||
error
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match outcome {
|
||||
crate::KbSolanaWsDetectionOutcome::Ignored => {
|
||||
stats.ignored_count += 1;
|
||||
}
|
||||
crate::KbSolanaWsDetectionOutcome::ObservationRecorded { .. } => {
|
||||
stats.observation_count += 1;
|
||||
}
|
||||
crate::KbSolanaWsDetectionOutcome::TokenCandidateRegistered { .. } => {
|
||||
stats.token_candidate_count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
stats
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
async fn create_database() -> crate::KbDatabase {
|
||||
let tempdir = tempfile::tempdir().expect("tempdir must succeed");
|
||||
let database_path = tempdir.path().join("ws_relay.sqlite3");
|
||||
let config = crate::KbDatabaseConfig {
|
||||
enabled: true,
|
||||
backend: crate::KbDatabaseBackend::Sqlite,
|
||||
sqlite: crate::KbSqliteDatabaseConfig {
|
||||
path: database_path.to_string_lossy().to_string(),
|
||||
create_if_missing: true,
|
||||
busy_timeout_ms: 5000,
|
||||
max_connections: 1,
|
||||
auto_initialize_schema: true,
|
||||
use_wal: true,
|
||||
},
|
||||
};
|
||||
crate::KbDatabase::connect_and_initialize(&config)
|
||||
.await
|
||||
.expect("database init must succeed")
|
||||
}
|
||||
|
||||
fn build_slot_notification() -> crate::KbJsonRpcWsNotification {
|
||||
crate::KbJsonRpcWsNotification {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
method: "slotNotification".to_string(),
|
||||
params: crate::KbJsonRpcWsNotificationParams {
|
||||
result: serde_json::json!({
|
||||
"slot": 414726860_u64,
|
||||
"parent": 414726859_u64,
|
||||
"root": 414726828_u64
|
||||
}),
|
||||
subscription: 1008_u64,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn relay_process_envelope_records_observation() {
|
||||
let database = create_database().await;
|
||||
let persistence = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database));
|
||||
let detector = crate::KbSolanaWsDetectionService::new(persistence);
|
||||
let relay = crate::KbWsDetectionRelay::new(detector);
|
||||
let envelope = crate::KbWsDetectionNotificationEnvelope::new(
|
||||
Some("mainnet_public_ws_slots".to_string()),
|
||||
build_slot_notification(),
|
||||
);
|
||||
let outcome_result = relay.process_envelope(&envelope).await;
|
||||
let outcome = match outcome_result {
|
||||
Ok(outcome) => outcome,
|
||||
Err(error) => panic!("process_envelope failed: {error}"),
|
||||
};
|
||||
match outcome {
|
||||
crate::KbSolanaWsDetectionOutcome::ObservationRecorded { observation_id } => {
|
||||
assert!(observation_id > 0);
|
||||
}
|
||||
_ => panic!("unexpected relay outcome"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn relay_worker_processes_channel() {
|
||||
let database = create_database().await;
|
||||
let persistence = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database));
|
||||
let detector = crate::KbSolanaWsDetectionService::new(persistence);
|
||||
let relay = crate::KbWsDetectionRelay::new(detector);
|
||||
let (sender, receiver) = crate::KbWsDetectionRelay::channel(8);
|
||||
let handle = relay.spawn(receiver);
|
||||
let send_result = sender
|
||||
.send(crate::KbWsDetectionNotificationEnvelope::new(
|
||||
Some("mainnet_public_ws_slots".to_string()),
|
||||
build_slot_notification(),
|
||||
))
|
||||
.await;
|
||||
if let Err(error) = send_result {
|
||||
panic!("send failed: {error}");
|
||||
}
|
||||
drop(sender);
|
||||
let join_result = handle.await;
|
||||
let stats = match join_result {
|
||||
Ok(stats) => stats,
|
||||
Err(error) => panic!("join failed: {error}"),
|
||||
};
|
||||
assert_eq!(stats.received_count, 1);
|
||||
assert_eq!(stats.observation_count, 1);
|
||||
assert_eq!(stats.error_count, 0);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user