diff --git a/CHANGELOG.md b/CHANGELOG.md index 79e713c..5d05125 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,4 +24,5 @@ 0.5.5 - Ajout des événements métier normalisés pour les swaps, liquidités, mints et burns de tokens 0.5.6 - Consolidation de la couche stockage : activation des foreign keys SQLite, lectures ciblées sur le modèle métier normalisé, index supplémentaires et tests unitaires dédiés 0.6.0 - Ajout du pipeline de détection technique : façade de persistance pour observations on-chain, signaux d’analyse et candidats tokens depuis les connecteurs RPC -0.6.1 - Ajout du bridge de détection Solana WS : notifications JSON-RPC persistées en observations, avec détection initiale des mints SPL / Token-2022 depuis programNotification +0.6.1 - Ajout du bridge de détection Solana WS : notifications JSON-RPC persistées en observations, avec détection initiale des mints SPL / Token-2022 depuis programNotification +0.6.2 - Branchement de WsClient vers le pipeline de détection via un relais asynchrone de notifications JSON-RPC WebSocket diff --git a/Cargo.toml b/Cargo.toml index 7c60452..6ce7614 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.6.1" +version = "0.6.2" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/kb_lib/src/detect.rs b/kb_lib/src/detect.rs index bef0f8a..515bde9 100644 --- a/kb_lib/src/detect.rs +++ b/kb_lib/src/detect.rs @@ -9,6 +9,7 @@ mod service; mod solana_ws; mod types; +mod ws_relay; pub use crate::detect::service::KbDetectionPersistenceService; pub use crate::detect::solana_ws::KbSolanaWsDetectionOutcome; @@ -17,3 +18,6 @@ pub use crate::detect::types::KbDetectionObservationInput; pub use crate::detect::types::KbDetectionSignalInput; pub use crate::detect::types::KbDetectionTokenCandidateInput; pub use crate::detect::types::KbDetectionTokenCandidateResult; +pub use crate::detect::ws_relay::KbWsDetectionNotificationEnvelope; +pub use crate::detect::ws_relay::KbWsDetectionRelay; +pub use crate::detect::ws_relay::KbWsDetectionRelayStats; diff --git a/kb_lib/src/detect/ws_relay.rs b/kb_lib/src/detect/ws_relay.rs new file mode 100644 index 0000000..7c907e2 --- /dev/null +++ b/kb_lib/src/detect/ws_relay.rs @@ -0,0 +1,210 @@ +// file: kb_lib/src/detect/ws_relay.rs + +//! WebSocket detection relay. +//! +//! This module receives normalized WebSocket JSON-RPC notifications from the +//! transport layer and forwards them to the Solana WS detection service. + +/// One forwarded WebSocket notification ready for detection. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbWsDetectionNotificationEnvelope { + /// Optional logical endpoint name. + pub endpoint_name: std::option::Option, + /// The parsed JSON-RPC notification. + pub notification: crate::KbJsonRpcWsNotification, +} + +impl KbWsDetectionNotificationEnvelope { + /// Creates a new notification envelope. + pub fn new( + endpoint_name: std::option::Option, + notification: crate::KbJsonRpcWsNotification, + ) -> Self { + Self { + endpoint_name, + notification, + } + } +} + +/// Runtime statistics for one relay worker. +#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] +pub struct KbWsDetectionRelayStats { + /// Number of received envelopes. + pub received_count: u64, + /// Number of ignored notifications. + pub ignored_count: u64, + /// Number of stored observations. + pub observation_count: u64, + /// Number of registered token candidates. + pub token_candidate_count: u64, + /// Number of processing errors. + pub error_count: u64, +} + +/// Asynchronous relay between `WsClient` notifications and the detection service. +#[derive(Debug, Clone)] +pub struct KbWsDetectionRelay { + /// Solana WS detection service. + detector: crate::KbSolanaWsDetectionService, +} + +impl KbWsDetectionRelay { + /// Creates a new relay. + pub fn new(detector: crate::KbSolanaWsDetectionService) -> Self { + Self { detector } + } + + /// Creates a bounded relay channel. + pub fn channel( + capacity: usize, + ) -> ( + tokio::sync::mpsc::Sender, + tokio::sync::mpsc::Receiver, + ) { + tokio::sync::mpsc::channel(capacity) + } + + /// Processes one forwarded notification. + pub async fn process_envelope( + &self, + envelope: &crate::KbWsDetectionNotificationEnvelope, + ) -> Result { + self.detector + .process_notification(envelope.endpoint_name.clone(), &envelope.notification) + .await + } + + /// Spawns one background relay worker. + pub fn spawn( + self, + mut receiver: tokio::sync::mpsc::Receiver, + ) -> tokio::task::JoinHandle { + tokio::spawn(async move { + let mut stats = crate::KbWsDetectionRelayStats::default(); + loop { + let recv_result = receiver.recv().await; + let envelope = match recv_result { + Some(envelope) => envelope, + None => break, + }; + stats.received_count += 1; + let outcome_result = self.process_envelope(&envelope).await; + let outcome = match outcome_result { + Ok(outcome) => outcome, + Err(error) => { + stats.error_count += 1; + tracing::error!( + target: "kb_lib::detect::ws_relay", + "ws detection relay processing failed endpoint_name={:?}: {}", + envelope.endpoint_name, + error + ); + continue; + } + }; + match outcome { + crate::KbSolanaWsDetectionOutcome::Ignored => { + stats.ignored_count += 1; + } + crate::KbSolanaWsDetectionOutcome::ObservationRecorded { .. } => { + stats.observation_count += 1; + } + crate::KbSolanaWsDetectionOutcome::TokenCandidateRegistered { .. } => { + stats.token_candidate_count += 1; + } + } + } + stats + }) + } +} + +#[cfg(test)] +mod tests { + async fn create_database() -> crate::KbDatabase { + let tempdir = tempfile::tempdir().expect("tempdir must succeed"); + let database_path = tempdir.path().join("ws_relay.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + crate::KbDatabase::connect_and_initialize(&config) + .await + .expect("database init must succeed") + } + + fn build_slot_notification() -> crate::KbJsonRpcWsNotification { + crate::KbJsonRpcWsNotification { + jsonrpc: "2.0".to_string(), + method: "slotNotification".to_string(), + params: crate::KbJsonRpcWsNotificationParams { + result: serde_json::json!({ + "slot": 414726860_u64, + "parent": 414726859_u64, + "root": 414726828_u64 + }), + subscription: 1008_u64, + }, + } + } + + #[tokio::test] + async fn relay_process_envelope_records_observation() { + let database = create_database().await; + let persistence = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database)); + let detector = crate::KbSolanaWsDetectionService::new(persistence); + let relay = crate::KbWsDetectionRelay::new(detector); + let envelope = crate::KbWsDetectionNotificationEnvelope::new( + Some("mainnet_public_ws_slots".to_string()), + build_slot_notification(), + ); + let outcome_result = relay.process_envelope(&envelope).await; + let outcome = match outcome_result { + Ok(outcome) => outcome, + Err(error) => panic!("process_envelope failed: {error}"), + }; + match outcome { + crate::KbSolanaWsDetectionOutcome::ObservationRecorded { observation_id } => { + assert!(observation_id > 0); + } + _ => panic!("unexpected relay outcome"), + } + } + + #[tokio::test] + async fn relay_worker_processes_channel() { + let database = create_database().await; + let persistence = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database)); + let detector = crate::KbSolanaWsDetectionService::new(persistence); + let relay = crate::KbWsDetectionRelay::new(detector); + let (sender, receiver) = crate::KbWsDetectionRelay::channel(8); + let handle = relay.spawn(receiver); + let send_result = sender + .send(crate::KbWsDetectionNotificationEnvelope::new( + Some("mainnet_public_ws_slots".to_string()), + build_slot_notification(), + )) + .await; + if let Err(error) = send_result { + panic!("send failed: {error}"); + } + drop(sender); + let join_result = handle.await; + let stats = match join_result { + Ok(stats) => stats, + Err(error) => panic!("join failed: {error}"), + }; + assert_eq!(stats.received_count, 1); + assert_eq!(stats.observation_count, 1); + assert_eq!(stats.error_count, 0); + } +} diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index 3e31f87..47b990b 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -159,3 +159,6 @@ pub use crate::detect::KbDetectionTokenCandidateInput; pub use crate::detect::KbDetectionTokenCandidateResult; pub use crate::detect::KbSolanaWsDetectionOutcome; pub use crate::detect::KbSolanaWsDetectionService; +pub use crate::detect::KbWsDetectionNotificationEnvelope; +pub use crate::detect::KbWsDetectionRelay; +pub use crate::detect::KbWsDetectionRelayStats; diff --git a/kb_lib/src/ws_client.rs b/kb_lib/src/ws_client.rs index 074f51b..1952018 100644 --- a/kb_lib/src/ws_client.rs +++ b/kb_lib/src/ws_client.rs @@ -233,6 +233,13 @@ pub struct WsClient { event_tx: tokio::sync::broadcast::Sender, runtime: std::sync::Arc>, registry: std::sync::Arc>, + detection_notification_forwarder: std::sync::Arc< + tokio::sync::Mutex< + std::option::Option< + tokio::sync::mpsc::Sender, + >, + >, + >, } impl WsClient { @@ -259,6 +266,7 @@ impl WsClient { event_tx, runtime: std::sync::Arc::new(tokio::sync::Mutex::new(WsClientRuntime::new())), registry: std::sync::Arc::new(tokio::sync::Mutex::new(WsClientRegistry::new())), + detection_notification_forwarder: std::sync::Arc::new(tokio::sync::Mutex::new(None)), }) } @@ -282,6 +290,21 @@ impl WsClient { self.event_tx.subscribe() } + /// Sets the optional detection notification forwarder. + pub async fn set_detection_notification_forwarder( + &self, + sender: tokio::sync::mpsc::Sender, + ) { + let mut guard = self.detection_notification_forwarder.lock().await; + *guard = Some(sender); + } + + /// Clears the optional detection notification forwarder. + pub async fn clear_detection_notification_forwarder(&self) { + let mut guard = self.detection_notification_forwarder.lock().await; + *guard = None; + } + /// Returns the next request identifier and increments the internal counter. pub fn next_request_id(&self) -> u64 { self.next_request_id @@ -1153,6 +1176,12 @@ impl WsClient { }); } } + forward_detection_notification_if_configured( + &self.detection_notification_forwarder, + self.endpoint.name.as_str(), + notification, + ) + .await; } } } @@ -1456,6 +1485,49 @@ fn kb_build_first_value_optional_config_params( params } +async fn forward_detection_notification_if_configured( + forwarder: &std::sync::Arc< + tokio::sync::Mutex< + std::option::Option< + tokio::sync::mpsc::Sender, + >, + >, + >, + endpoint_name: &str, + notification: &crate::KbJsonRpcWsNotification, +) { + let sender_option = { + let guard = forwarder.lock().await; + guard.clone() + }; + let sender = match sender_option { + Some(sender) => sender, + None => return, + }; + let envelope = crate::KbWsDetectionNotificationEnvelope::new( + Some(endpoint_name.to_string()), + notification.clone(), + ); + let send_result = sender.try_send(envelope); + match send_result { + Ok(()) => {} + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + tracing::warn!( + target: "kb_lib::ws_client", + "detection notification relay queue is full endpoint_name={}", + endpoint_name + ); + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { + tracing::warn!( + target: "kb_lib::ws_client", + "detection notification relay queue is closed endpoint_name={}", + endpoint_name + ); + } + } +} + #[cfg(test)] mod tests { use futures_util::SinkExt; @@ -2139,4 +2211,34 @@ mod tests { client.disconnect().await.expect("disconnect must succeed"); server.shutdown().await; } + + #[tokio::test] + async fn notification_is_forwarded_to_detection_relay() { + let server = TestWsServer::spawn_json_rpc_server().await; + let endpoint = make_ws_endpoint(server.url.clone()); + let client = crate::WsClient::new(endpoint).expect("client creation must succeed"); + let (relay_tx, mut relay_rx) = crate::KbWsDetectionRelay::channel(8); + client.set_detection_notification_forwarder(relay_tx).await; + let mut receiver = client.subscribe_events(); + client.connect().await.expect("connect must succeed"); + let _ = recv_event(&mut receiver).await; + let request_id = client + .send_json_rpc_request("slotSubscribe".to_string(), std::vec::Vec::new()) + .await + .expect("json-rpc send must succeed"); + assert_eq!(request_id, 1); + let envelope = tokio::time::timeout(std::time::Duration::from_secs(2), relay_rx.recv()) + .await + .expect("relay receive timeout") + .expect("relay must receive one envelope"); + assert_eq!(envelope.endpoint_name.as_deref(), Some("test_ws")); + assert_eq!(envelope.notification.method, "slotNotification"); + assert_eq!(envelope.notification.params.subscription, 77); + assert_eq!( + envelope.notification.params.result["slot"], + serde_json::Value::from(12u64) + ); + client.disconnect().await.expect("disconnect must succeed"); + server.shutdown().await; + } }