From 0f228b2ae56ec6e984bde0974d5b13feafa05cac Mon Sep 17 00:00:00 2001 From: SinuS Von SifriduS Date: Wed, 29 Apr 2026 20:07:18 +0200 Subject: [PATCH] 0.7.17 --- CHANGELOG.md | 1 + Cargo.toml | 2 +- ROADMAP.md | 98 ++++- kb_lib/src/lib.rs | 10 + kb_lib/src/ws_hybrid_observation.rs | 429 ++++++++++++++++++++ kb_lib/src/ws_hybrid_runtime.rs | 264 +++++++++++++ kb_lib/src/ws_hybrid_watch.rs | 284 ++++++++++++++ kb_lib/src/ws_manager.rs | 582 +++++++++++++++++++++++++++- 8 files changed, 1650 insertions(+), 20 deletions(-) create mode 100644 kb_lib/src/ws_hybrid_observation.rs create mode 100644 kb_lib/src/ws_hybrid_runtime.rs create mode 100644 kb_lib/src/ws_hybrid_watch.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 17b4c35..37802d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,3 +47,4 @@ 0.7.14 - Ajout d’une couche consolidée de traçabilité fondatrice multi-DEX avec enregistrement des pool origins, rattachement au decoded event, au pool/pair/listing et à l’éventuelle launch attribution 0.7.15 - Ajout d’une couche wallets observés et participations observées, avec extraction des rôles depuis les payloads décodés et rattachement transaction / decoded event / pool / pair 0.7.16 - Ajout d’une première couche trade events et pair metrics avec normalisation des swaps, agrégation par paire et branchement automatique dans le pipeline de résolution transactionnelle +0.7.17 - Ajout d’une première couche WS hybride avec collecte de cibles `programSubscribe` / `accountSubscribe` et persistance technique dédupliquée des notifications `logs / program / account` diff --git a/Cargo.toml b/Cargo.toml index 2496965..48d6c40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.7.16" +version = "0.7.17" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index 7b60318..6f0401d 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -614,7 +614,73 @@ Réalisé : - branchement automatique dans le pipeline de résolution transactionnelle, - conservation d’un modèle simple et idempotent en préparation de futures candles / séries temporelles. -### 6.049. Version `0.7.x` — Couverture DEX v1 +### 6.049. Version `0.7.17` — Renforcement temps réel WS hybride +Réalisé : + +- conservation de `logsSubscribe` comme source canonique de signatures candidates, +- ajout d’une collecte de cibles `programSubscribe` à partir des DEX actifs connus, +- ajout d’une collecte de cibles `accountSubscribe` à partir des pools actifs connus, +- ajout d’une couche d’observations techniques WS hybrides pour `logs / program / account`, +- ajout d’une première déduplication en mémoire des notifications techniques reçues en parallèle, +- ajout d’une façade runtime pour exposer ce comportement au futur branchement `ws_manager`. + +### 6.050. Version `0.7.18` — Backfill historique ciblé par token +Objectif : permettre la reconstruction historique ciblée d’un token encore actif non observé en live. + +À faire : + +- ajouter une entrée `token_mint -> backfill`, +- retrouver les signatures historiques via `getSignaturesForAddress`, +- résoudre les transactions pertinentes via `getTransaction`, +- retrouver les pools et paires liés au token, +- reconstruire les swaps observables et les métriques dérivées, +- cibler prioritairement des tokens encore actifs comme `USDC`, `USDT`, `RAY`, `JUP`, +- éviter un scan exhaustif de toute la blockchain. + +### 6.051. Version `0.7.19` — Holdings observés +Objectif : compléter la couche acteurs observés avec une première vision des balances et comptes token observés, sans viser encore un portefeuille temps réel exhaustif. + +À faire : + +- ajouter une table de holdings observés rattachés à `wallets` et `tokens`, +- distinguer snapshots observés et états recalculés, +- permettre l’enregistrement de balances observées depuis les payloads décodés ou les lectures RPC ciblées, +- rattacher un holding observé à une transaction, un slot et une source, +- préparer l’évolution future vers des reconstructions de portefeuille plus complètes. + +### 6.052. Version `0.7.20` — Candles / OHLCV +Objectif : compléter la première couche d’agrégats DEX avec des séries temporelles exploitables par paire. + +À faire : + +- ajouter des candles par paire et par fenêtre temporelle, +- calculer `open`, `high`, `low`, `close`, `volume` et `trade_count`, +- alimenter ces candles à partir des `trade events` déjà normalisés, +- conserver un modèle idempotent apte à être recalculé ou consolidé, +- préparer la couche analytique riche de `0.8.x`. + +### 6.053. Version `0.7.21` — Signaux analytiques plus riches +Objectif : préparer, avant `0.8.x`, une première couche de signaux enrichis au-dessus des objets déjà consolidés. + +À faire : + +- produire des signaux plus riches à partir des `pair metrics`, `wallet participations`, `pool origins` et `launch origins`, +- introduire des signaux de démarrage d’activité, accélération, concentration, absence de liquidité ou premières anomalies observées, +- distinguer clairement signaux techniques, signaux métier et signaux analytiques enrichis, +- conserver une logique idempotente et explicable avant l’arrivée des filtres plus complexes de `0.8.x`. + +### 6.054. Version `0.7.22` — `kb_app` : inspection et tests du pipeline `0.7.x` +Objectif : permettre depuis l’application desktop de tester, inspecter et valider tout le pipeline `0.7.x` sans recourir uniquement aux logs bruts ou à SQLite. + +À faire : + +- ajouter une ou plusieurs vues `kb_app` dédiées à l’inspection des transactions résolues, événements DEX décodés, pools, paires, launch origins, pool origins, wallets observés, holdings observés et trade events, +- permettre la recherche par signature, pool, paire, token mint ou wallet, +- afficher les liens entre objets techniques et objets métier, +- permettre de lancer manuellement certains backfills ou inspections ciblées, +- fournir un socle UI pour tester en pratique tout ce qui a été construit dans la série `0.7.x`. + +### 6.055. Version `0.7.x` — Couverture DEX v1 Objectif : structurer les connecteurs DEX autour d’un pipeline complet de résolution, décodage et normalisation métier. Protocoles cibles : @@ -637,10 +703,11 @@ Résultat attendu : - identification fiable des programmes et versions, - résolution des signatures pertinentes, - décodage des transactions utiles, -- création d’objets métier riches pour tokens, pools, paires, listings et participants, -- remplacement progressif des scripts heuristiques externes par des composants Rust intégrés. +- création d’objets métier riches pour tokens, pools, paires, listings, participants et holdings observés, +- préparation d’une détection temps réel hybride et d’un backfill ciblé compatible avec les mêmes objets métier, +- préparation d’agrégats DEX plus riches, de candles / OHLCV et d’une UI d’inspection du pipeline `0.7.x`. -### 6.050. Version `0.8.x` — Analyse et filtrage +### 6.056. Version `0.8.x` — Analyse et filtrage Objectif : transformer les événements bruts en signaux exploitables. À faire : @@ -649,9 +716,10 @@ Objectif : transformer les événements bruts en signaux exploitables. - règles de filtrage, - exclusions des tokens non tradables, - statistiques de comportement, -- premiers patterns. +- premiers patterns, +- enrichissement des signaux analytiques préparés en fin de `0.7.x`. -### 6.051. Version `1.x.y` — Wallets et swap préparatoire +### 6.057. Version `1.x.y` — Wallets et swap préparatoire Objectif : préparer la couche d’action. À faire : @@ -662,7 +730,7 @@ Objectif : préparer la couche d’action. - préparation d’ordres et de swaps, - simulation et garde-fous. -### 6.052. Version `2.x.y` — Trading semi-automatisé +### 6.058. Version `2.x.y` — Trading semi-automatisé Objectif : brancher l’analyse à l’action tout en gardant des garde-fous explicites. À faire : @@ -673,7 +741,7 @@ Objectif : brancher l’analyse à l’action tout en gardant des garde-fous exp - confirmations explicites ou semi-automatiques, - journaux d’exécution. -### 6.053. Version `3.x.y` — Yellowstone gRPC +### 6.059. Version `3.x.y` — Yellowstone gRPC Objectif : ajouter le connecteur gRPC dédié. À faire : @@ -761,9 +829,11 @@ Le projet doit maintenir au minimum : La priorité immédiate est désormais la suivante : -1. démarrer la version `0.7.10` avec le premier support `Orca / Whirlpools`, -2. conserver un décodeur séparé par protocole et par version, -3. préparer ensuite la version `0.7.11` pour `FluxBeam`, -4. préparer ensuite la version `0.7.12` pour `DexLab`, -5. étendre ensuite la couche `launch origins` à `Bags` et `Moonit`, -6. garder l’unification multi-DEX et la consolidation métier pour `0.7.14`. +1. finaliser complètement la fin de série `0.7.x` avant l’ouverture de `0.8.x`, +2. ajouter un renforcement temps réel hybride avec `logsSubscribe`, `programSubscribe` et `accountSubscribe` en parallèle des sources déjà exploitées, +3. conserver la résolution transactionnelle comme source de normalisation commune, +4. ajouter ensuite un mode de backfill historique ciblé par `token_mint` pour des tokens encore actifs donnés explicitement, +5. compléter la couche métier avec des `holdings observés`, +6. ajouter des `candles / OHLCV` et une première couche de signaux analytiques plus riches, +7. doter `kb_app` d’une vraie UI d’inspection et de test pour l’ensemble du pipeline `0.7.x`, +8. préparer enfin l’arrivée de Yellowstone gRPC comme extension de capacité, et non comme remplacement du socle existant. diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index f56069b..36398ac 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -29,6 +29,9 @@ mod tx_resolution; mod types; mod wallet_observation; mod ws_client; +mod ws_hybrid_observation; +mod ws_hybrid_runtime; +mod ws_hybrid_watch; mod ws_manager; pub use config::KbAppConfig; @@ -295,6 +298,13 @@ pub use ws_client::WsClient; pub use ws_client::WsEvent; pub use ws_client::WsOutgoingMessage; pub use ws_client::WsSubscriptionInfo; +pub use ws_hybrid_observation::KbWsHybridObservationResult; +pub use ws_hybrid_observation::KbWsHybridObservationService; +pub use ws_hybrid_runtime::KbWsHybridRuntimeService; +pub use ws_hybrid_watch::KbWsHybridWatchService; +pub use ws_hybrid_watch::KbWsHybridWatchSnapshot; +pub use ws_hybrid_watch::KbWsWatchTarget; +pub use ws_hybrid_watch::KbWsWatchTargetKind; pub use ws_manager::WsManagedEndpointSnapshot; pub use ws_manager::WsManager; pub use ws_manager::WsManagerSnapshot; diff --git a/kb_lib/src/ws_hybrid_observation.rs b/kb_lib/src/ws_hybrid_observation.rs new file mode 100644 index 0000000..2ad7653 --- /dev/null +++ b/kb_lib/src/ws_hybrid_observation.rs @@ -0,0 +1,429 @@ +// file: kb_lib/src/ws_hybrid_observation.rs + +//! Hybrid WebSocket technical observation service. + +use std::hash::Hasher; + +/// One hybrid WebSocket observation result. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbWsHybridObservationResult { + /// Stable observation name. + pub observation_name: std::string::String, + /// Stable deduplication key. + pub dedupe_key: std::string::String, + /// Optional watched address. + pub watched_address: std::option::Option, + /// Optional observed slot. + pub slot: std::option::Option, + /// Whether this observation was newly recorded during this service lifetime. + pub created_observation: bool, +} + +/// Hybrid WebSocket technical observation service. +#[derive(Debug, Clone)] +pub struct KbWsHybridObservationService { + persistence: crate::KbDetectionPersistenceService, + seen_dedupe_keys: + std::sync::Arc>>, +} + +impl KbWsHybridObservationService { + /// Creates a new hybrid WebSocket technical observation service. + pub fn new(database: std::sync::Arc) -> Self { + let persistence = crate::KbDetectionPersistenceService::new(database.clone()); + Self { + persistence, + seen_dedupe_keys: std::sync::Arc::new(tokio::sync::Mutex::new( + std::collections::HashSet::::new(), + )), + } + } + + /// Records one `logsNotification` payload. + pub async fn record_logs_notification( + &self, + endpoint_name: std::option::Option, + payload: &serde_json::Value, + ) -> Result { + let signature = kb_extract_string_by_candidate_keys(payload, &["signature"]); + self.record_observation_inner( + "ws.hybrid.logs_notification".to_string(), + "signal.ws.hybrid.logs_notification".to_string(), + endpoint_name, + signature, + payload, + ) + .await + } + + /// Records one `programNotification` payload for one watched program id. + pub async fn record_program_notification( + &self, + endpoint_name: std::option::Option, + watched_program_id: std::string::String, + payload: &serde_json::Value, + ) -> Result { + let pubkey = kb_extract_string_by_candidate_keys(payload, &["pubkey"]); + let watched_address = match pubkey { + Some(pubkey) => Some(pubkey), + None => Some(watched_program_id), + }; + self.record_observation_inner( + "ws.hybrid.program_notification".to_string(), + "signal.ws.hybrid.program_notification".to_string(), + endpoint_name, + watched_address, + payload, + ) + .await + } + + /// Records one `accountNotification` payload for one watched account address. + pub async fn record_account_notification( + &self, + endpoint_name: std::option::Option, + watched_account: std::string::String, + payload: &serde_json::Value, + ) -> Result { + self.record_observation_inner( + "ws.hybrid.account_notification".to_string(), + "signal.ws.hybrid.account_notification".to_string(), + endpoint_name, + Some(watched_account), + payload, + ) + .await + } + + async fn record_observation_inner( + &self, + observation_name: std::string::String, + signal_name: std::string::String, + endpoint_name: std::option::Option, + watched_address: std::option::Option, + payload: &serde_json::Value, + ) -> Result { + let slot = kb_extract_slot(payload); + let payload_hash = kb_hash_payload(payload); + let dedupe_key = kb_build_ws_observation_dedupe_key( + observation_name.as_str(), + endpoint_name.as_deref(), + watched_address.as_deref(), + slot, + payload_hash.as_str(), + ); + let mut seen_guard = self.seen_dedupe_keys.lock().await; + let already_seen = seen_guard.contains(&dedupe_key); + if !already_seen { + seen_guard.insert(dedupe_key.clone()); + } + drop(seen_guard); + if already_seen { + return Ok(crate::KbWsHybridObservationResult { + observation_name, + dedupe_key, + watched_address, + slot, + created_observation: false, + }); + } + let observation_payload = payload.clone(); + let observation_result = self + .persistence + .record_observation(&crate::KbDetectionObservationInput::new( + observation_name.clone(), + crate::KbObservationSourceKind::WsRpc, + endpoint_name.clone(), + dedupe_key.clone(), + slot, + observation_payload.clone(), + )) + .await; + if let Err(error) = observation_result { + return Err(error); + } + let signal_result = self + .persistence + .record_signal(&crate::KbDetectionSignalInput::new( + signal_name, + crate::KbAnalysisSignalSeverity::Low, + dedupe_key.clone(), + None, + None, + observation_payload, + )) + .await; + if let Err(error) = signal_result { + return Err(error); + } + Ok(crate::KbWsHybridObservationResult { + observation_name, + dedupe_key, + watched_address, + slot, + created_observation: true, + }) + } +} + +fn kb_extract_slot(value: &serde_json::Value) -> std::option::Option { + kb_extract_u64_by_candidate_keys(value, &["slot"]) +} + +fn kb_extract_string_by_candidate_keys( + value: &serde_json::Value, + candidate_keys: &[&str], +) -> std::option::Option { + if let Some(object) = value.as_object() { + for candidate_key in candidate_keys { + let direct_option = object.get(*candidate_key); + if let Some(direct) = direct_option { + let text_option = direct.as_str(); + if let Some(text) = text_option { + return Some(text.to_string()); + } + } + } + for nested_value in object.values() { + let nested_result = kb_extract_string_by_candidate_keys(nested_value, candidate_keys); + if nested_result.is_some() { + return nested_result; + } + } + return None; + } + if let Some(array) = value.as_array() { + for nested_value in array { + let nested_result = kb_extract_string_by_candidate_keys(nested_value, candidate_keys); + if nested_result.is_some() { + return nested_result; + } + } + } + None +} + +fn kb_extract_u64_by_candidate_keys( + value: &serde_json::Value, + candidate_keys: &[&str], +) -> std::option::Option { + if let Some(object) = value.as_object() { + for candidate_key in candidate_keys { + let direct_option = object.get(*candidate_key); + if let Some(direct) = direct_option { + let number_option = direct.as_u64(); + if let Some(number) = number_option { + return Some(number); + } + } + } + for nested_value in object.values() { + let nested_result = kb_extract_u64_by_candidate_keys(nested_value, candidate_keys); + if nested_result.is_some() { + return nested_result; + } + } + return None; + } + if let Some(array) = value.as_array() { + for nested_value in array { + let nested_result = kb_extract_u64_by_candidate_keys(nested_value, candidate_keys); + if nested_result.is_some() { + return nested_result; + } + } + } + None +} + +fn kb_hash_payload(payload: &serde_json::Value) -> std::string::String { + let payload_text_result = serde_json::to_string(payload); + let payload_text = match payload_text_result { + Ok(payload_text) => payload_text, + Err(_) => return "serde_error".to_string(), + }; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + std::hash::Hash::hash(&payload_text, &mut hasher); + hasher.finish().to_string() +} + +fn kb_build_ws_observation_dedupe_key( + source_method: &str, + endpoint_name: std::option::Option<&str>, + address: std::option::Option<&str>, + slot: std::option::Option, + payload_hash: &str, +) -> std::string::String { + format!( + "{}:{}:{}:{}:{}", + source_method, + endpoint_name.unwrap_or_default(), + address.unwrap_or_default(), + slot.unwrap_or_default(), + payload_hash + ) +} + +#[cfg(test)] +mod tests { + async fn make_database() -> std::sync::Arc { + let tempdir_result = tempfile::tempdir(); + let tempdir = match tempdir_result { + Ok(tempdir) => tempdir, + Err(error) => panic!("tempdir must succeed: {}", error), + }; + let database_path = tempdir.path().join("ws_hybrid_observation.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database_result = crate::KbDatabase::connect_and_initialize(&config).await; + let database = match database_result { + Ok(database) => database, + Err(error) => panic!("database init must succeed: {}", error), + }; + std::sync::Arc::new(database) + } + + #[tokio::test] + async fn record_program_notification_is_idempotent_for_same_payload() { + let database = make_database().await; + let service = crate::KbWsHybridObservationService::new(database); + let payload = serde_json::json!({ + "method": "programNotification", + "params": { + "result": { + "context": { + "slot": 111 + }, + "value": { + "pubkey": "ProgramOwnedAccount111", + "account": { + "lamports": 1, + "owner": "Program111" + } + } + } + } + }); + let first_result = service + .record_program_notification( + Some("helius_primary_ws_programs".to_string()), + "Program111".to_string(), + &payload, + ) + .await; + let first = match first_result { + Ok(first) => first, + Err(error) => panic!("first program notification must succeed: {}", error), + }; + assert!(first.created_observation); + let second_result = service + .record_program_notification( + Some("helius_primary_ws_programs".to_string()), + "Program111".to_string(), + &payload, + ) + .await; + let second = match second_result { + Ok(second) => second, + Err(error) => panic!("second program notification must succeed: {}", error), + }; + assert!(!second.created_observation); + assert_eq!(first.dedupe_key, second.dedupe_key); + } + + #[tokio::test] + async fn record_account_notification_is_idempotent_for_same_payload() { + let database = make_database().await; + let service = crate::KbWsHybridObservationService::new(database); + let payload = serde_json::json!({ + "method": "accountNotification", + "params": { + "result": { + "context": { + "slot": 222 + }, + "value": { + "lamports": 10, + "owner": "Program111" + } + } + } + }); + let first_result = service + .record_account_notification( + Some("mainnet_public_ws_accounts".to_string()), + "PoolAccount111".to_string(), + &payload, + ) + .await; + let first = match first_result { + Ok(first) => first, + Err(error) => panic!("first account notification must succeed: {}", error), + }; + assert!(first.created_observation); + let second_result = service + .record_account_notification( + Some("mainnet_public_ws_accounts".to_string()), + "PoolAccount111".to_string(), + &payload, + ) + .await; + let second = match second_result { + Ok(second) => second, + Err(error) => panic!("second account notification must succeed: {}", error), + }; + assert!(!second.created_observation); + assert_eq!(first.dedupe_key, second.dedupe_key); + } + + #[tokio::test] + async fn record_logs_notification_uses_signature_for_dedupe() { + let database = make_database().await; + let service = crate::KbWsHybridObservationService::new(database); + let payload = serde_json::json!({ + "method": "logsNotification", + "params": { + "result": { + "context": { + "slot": 333 + }, + "value": { + "signature": "LogsSignature111", + "err": null, + "logs": [ + "Program log: Instruction: InitializePool" + ] + } + } + } + }); + let first_result = service + .record_logs_notification(Some("mainnet_public_ws_logs".to_string()), &payload) + .await; + let first = match first_result { + Ok(first) => first, + Err(error) => panic!("first logs notification must succeed: {}", error), + }; + assert!(first.created_observation); + assert_eq!(first.watched_address, Some("LogsSignature111".to_string())); + let second_result = service + .record_logs_notification(Some("mainnet_public_ws_logs".to_string()), &payload) + .await; + let second = match second_result { + Ok(second) => second, + Err(error) => panic!("second logs notification must succeed: {}", error), + }; + assert!(!second.created_observation); + assert_eq!(first.dedupe_key, second.dedupe_key); + } +} diff --git a/kb_lib/src/ws_hybrid_runtime.rs b/kb_lib/src/ws_hybrid_runtime.rs new file mode 100644 index 0000000..f04b2ac --- /dev/null +++ b/kb_lib/src/ws_hybrid_runtime.rs @@ -0,0 +1,264 @@ +// file: kb_lib/src/ws_hybrid_runtime.rs + +//! Hybrid WebSocket runtime facade. + +/// Facade service combining hybrid watch-target collection and +/// hybrid technical observation recording. +#[derive(Debug, Clone)] +pub struct KbWsHybridRuntimeService { + watch_service: crate::KbWsHybridWatchService, + observation_service: crate::KbWsHybridObservationService, +} + +impl KbWsHybridRuntimeService { + /// Creates a new hybrid WebSocket runtime facade. + pub fn new(database: std::sync::Arc) -> Self { + let watch_service = crate::KbWsHybridWatchService::new(database.clone()); + let observation_service = crate::KbWsHybridObservationService::new(database); + Self { + watch_service, + observation_service, + } + } + + /// Collects the current hybrid watch snapshot. + pub async fn collect_watch_snapshot( + &self, + ) -> Result { + self.watch_service.collect_snapshot().await + } + + /// Records one `logsNotification` payload. + pub async fn record_logs_notification( + &self, + endpoint_name: std::option::Option, + payload: &serde_json::Value, + ) -> Result { + self.observation_service + .record_logs_notification(endpoint_name, payload) + .await + } + + /// Records one `programNotification` payload. + pub async fn record_program_notification( + &self, + endpoint_name: std::option::Option, + watched_program_id: std::string::String, + payload: &serde_json::Value, + ) -> Result { + self.observation_service + .record_program_notification(endpoint_name, watched_program_id, payload) + .await + } + + /// Records one `accountNotification` payload. + pub async fn record_account_notification( + &self, + endpoint_name: std::option::Option, + watched_account: std::string::String, + payload: &serde_json::Value, + ) -> Result { + self.observation_service + .record_account_notification(endpoint_name, watched_account, payload) + .await + } +} + +#[cfg(test)] +mod tests { + async fn make_database() -> std::sync::Arc { + let tempdir_result = tempfile::tempdir(); + let tempdir = match tempdir_result { + Ok(tempdir) => tempdir, + Err(error) => panic!("tempdir must succeed: {}", error), + }; + let database_path = tempdir.path().join("ws_hybrid_runtime.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database_result = crate::KbDatabase::connect_and_initialize(&config).await; + let database = match database_result { + Ok(database) => database, + Err(error) => panic!("database init must succeed: {}", error), + }; + std::sync::Arc::new(database) + } + + async fn seed_dexes_and_pools(database: std::sync::Arc) { + let dex_a = crate::KbDexDto::new( + "raydium_amm_v4".to_string(), + "Raydium AmmV4".to_string(), + Some("RaydiumProgram111".to_string()), + None, + true, + ); + let dex_a_id_result = crate::upsert_dex(database.as_ref(), &dex_a).await; + let dex_a_id = match dex_a_id_result { + Ok(dex_a_id) => dex_a_id, + Err(error) => panic!("dex A upsert must succeed: {}", error), + }; + let dex_b = crate::KbDexDto::new( + "meteora_dbc".to_string(), + "Meteora DBC".to_string(), + Some("MeteoraProgram111".to_string()), + Some("SharedRouter111".to_string()), + true, + ); + let dex_b_id_result = crate::upsert_dex(database.as_ref(), &dex_b).await; + let dex_b_id = match dex_b_id_result { + Ok(dex_b_id) => dex_b_id, + Err(error) => panic!("dex B upsert must succeed: {}", error), + }; + let dex_c = crate::KbDexDto::new( + "pump_swap".to_string(), + "PumpSwap".to_string(), + Some("PumpSwapProgram111".to_string()), + Some("SharedRouter111".to_string()), + true, + ); + let dex_c_id_result = crate::upsert_dex(database.as_ref(), &dex_c).await; + let dex_c_id = match dex_c_id_result { + Ok(dex_c_id) => dex_c_id, + Err(error) => panic!("dex C upsert must succeed: {}", error), + }; + let pool_a = crate::KbPoolDto::new( + dex_a_id, + "PoolAddress111".to_string(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Active, + ); + let pool_a_result = crate::upsert_pool(database.as_ref(), &pool_a).await; + if let Err(error) = pool_a_result { + panic!("pool A upsert must succeed: {}", error); + } + let pool_b = crate::KbPoolDto::new( + dex_b_id, + "PoolAddress222".to_string(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Active, + ); + let pool_b_result = crate::upsert_pool(database.as_ref(), &pool_b).await; + if let Err(error) = pool_b_result { + panic!("pool B upsert must succeed: {}", error); + } + let pool_c = crate::KbPoolDto::new( + dex_c_id, + "PoolAddress333".to_string(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Inactive, + ); + let pool_c_result = crate::upsert_pool(database.as_ref(), &pool_c).await; + if let Err(error) = pool_c_result { + panic!("pool C upsert must succeed: {}", error); + } + } + + #[tokio::test] + async fn collect_watch_snapshot_combines_program_and_account_targets() { + let database = make_database().await; + seed_dexes_and_pools(database.clone()).await; + let service = crate::KbWsHybridRuntimeService::new(database); + let snapshot_result = service.collect_watch_snapshot().await; + let snapshot = match snapshot_result { + Ok(snapshot) => snapshot, + Err(error) => panic!("collect watch snapshot must succeed: {}", error), + }; + assert_eq!(snapshot.program_targets.len(), 4); + assert_eq!(snapshot.account_targets.len(), 2); + } + + #[tokio::test] + async fn record_notifications_are_exposed_through_runtime_facade() { + let database = make_database().await; + let service = crate::KbWsHybridRuntimeService::new(database); + let logs_payload = serde_json::json!({ + "method": "logsNotification", + "params": { + "result": { + "context": { + "slot": 123 + }, + "value": { + "signature": "RuntimeLogsSig111", + "err": null, + "logs": [ + "Program log: Instruction: InitializePool" + ] + } + } + } + }); + let logs_result = service + .record_logs_notification(Some("mainnet_public_ws_logs".to_string()), &logs_payload) + .await; + let logs_observation = match logs_result { + Ok(logs_observation) => logs_observation, + Err(error) => panic!("logs record must succeed: {}", error), + }; + assert!(logs_observation.created_observation); + let program_payload = serde_json::json!({ + "method": "programNotification", + "params": { + "result": { + "context": { + "slot": 124 + }, + "value": { + "pubkey": "RuntimeProgramOwned111", + "account": { + "lamports": 1, + "owner": "Program111" + } + } + } + } + }); + let program_result = service + .record_program_notification( + Some("helius_primary_ws_programs".to_string()), + "Program111".to_string(), + &program_payload, + ) + .await; + let program_observation = match program_result { + Ok(program_observation) => program_observation, + Err(error) => panic!("program record must succeed: {}", error), + }; + assert!(program_observation.created_observation); + let account_payload = serde_json::json!({ + "method": "accountNotification", + "params": { + "result": { + "context": { + "slot": 125 + }, + "value": { + "lamports": 10, + "owner": "Program111" + } + } + } + }); + let account_result = service + .record_account_notification( + Some("mainnet_public_ws_accounts".to_string()), + "RuntimePool111".to_string(), + &account_payload, + ) + .await; + let account_observation = match account_result { + Ok(account_observation) => account_observation, + Err(error) => panic!("account record must succeed: {}", error), + }; + assert!(account_observation.created_observation); + } +} diff --git a/kb_lib/src/ws_hybrid_watch.rs b/kb_lib/src/ws_hybrid_watch.rs new file mode 100644 index 0000000..07841d7 --- /dev/null +++ b/kb_lib/src/ws_hybrid_watch.rs @@ -0,0 +1,284 @@ +// file: kb_lib/src/ws_hybrid_watch.rs + +//! Hybrid WebSocket watch-target collection service. + +/// Kind of hybrid WebSocket watch target. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum KbWsWatchTargetKind { + /// Program-level watch target for `programSubscribe`. + Program, + /// Account-level watch target for `accountSubscribe`. + Account, +} + +/// One hybrid WebSocket watch target. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbWsWatchTarget { + /// Target kind. + pub kind: crate::KbWsWatchTargetKind, + /// Base-58 address to watch. + pub address: std::string::String, + /// Logical source label. + pub logical_source: std::string::String, +} + +/// Snapshot of hybrid WebSocket watch targets. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbWsHybridWatchSnapshot { + /// Program targets suitable for `programSubscribe`. + pub program_targets: std::vec::Vec, + /// Account targets suitable for `accountSubscribe`. + pub account_targets: std::vec::Vec, +} + +/// Hybrid WebSocket watch-target collection service. +#[derive(Debug, Clone)] +pub struct KbWsHybridWatchService { + database: std::sync::Arc, +} + +impl KbWsHybridWatchService { + /// Creates a new hybrid watch-target collection service. + pub fn new(database: std::sync::Arc) -> Self { + Self { database } + } + + /// Collects the deduplicated set of DEX program targets. + pub async fn collect_program_targets( + &self, + ) -> Result, crate::KbError> { + let dexes_result = crate::list_dexes(self.database.as_ref()).await; + let dexes = match dexes_result { + Ok(dexes) => dexes, + Err(error) => return Err(error), + }; + let mut seen = std::collections::HashSet::::new(); + let mut targets = std::vec::Vec::new(); + for dex in dexes { + if !dex.is_enabled { + continue; + } + if let Some(program_id) = dex.program_id.clone() { + if !program_id.is_empty() && !seen.contains(&program_id) { + seen.insert(program_id.clone()); + targets.push(crate::KbWsWatchTarget { + kind: crate::KbWsWatchTargetKind::Program, + address: program_id, + logical_source: format!("dex:{}:program_id", dex.code), + }); + } + } + if let Some(router_program_id) = dex.router_program_id.clone() { + if !router_program_id.is_empty() && !seen.contains(&router_program_id) { + seen.insert(router_program_id.clone()); + targets.push(crate::KbWsWatchTarget { + kind: crate::KbWsWatchTargetKind::Program, + address: router_program_id, + logical_source: format!("dex:{}:router_program_id", dex.code), + }); + } + } + } + targets.sort_by(|left, right| left.address.cmp(&right.address)); + Ok(targets) + } + + /// Collects the deduplicated set of active pool-account targets. + pub async fn collect_account_targets( + &self, + ) -> Result, crate::KbError> { + let pools_result = crate::list_pools(self.database.as_ref()).await; + let pools = match pools_result { + Ok(pools) => pools, + Err(error) => return Err(error), + }; + let mut seen = std::collections::HashSet::::new(); + let mut targets = std::vec::Vec::new(); + for pool in pools { + if pool.status != crate::KbPoolStatus::Active { + continue; + } + if pool.address.is_empty() { + continue; + } + if seen.contains(&pool.address) { + continue; + } + seen.insert(pool.address.clone()); + targets.push(crate::KbWsWatchTarget { + kind: crate::KbWsWatchTargetKind::Account, + address: pool.address.clone(), + logical_source: format!("pool:{}", pool.dex_id), + }); + } + targets.sort_by(|left, right| left.address.cmp(&right.address)); + Ok(targets) + } + + /// Collects both program and account targets in one snapshot. + pub async fn collect_snapshot(&self) -> Result { + let program_targets_result = self.collect_program_targets().await; + let program_targets = match program_targets_result { + Ok(program_targets) => program_targets, + Err(error) => return Err(error), + }; + let account_targets_result = self.collect_account_targets().await; + let account_targets = match account_targets_result { + Ok(account_targets) => account_targets, + Err(error) => return Err(error), + }; + Ok(crate::KbWsHybridWatchSnapshot { + program_targets, + account_targets, + }) + } +} + +#[cfg(test)] +mod tests { + async fn make_database() -> std::sync::Arc { + let tempdir_result = tempfile::tempdir(); + let tempdir = match tempdir_result { + Ok(tempdir) => tempdir, + Err(error) => panic!("tempdir must succeed: {}", error), + }; + let database_path = tempdir.path().join("ws_hybrid_watch.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database_result = crate::KbDatabase::connect_and_initialize(&config).await; + let database = match database_result { + Ok(database) => database, + Err(error) => panic!("database init must succeed: {}", error), + }; + std::sync::Arc::new(database) + } + async fn seed_dexes_and_pools(database: std::sync::Arc) { + let dex_a = crate::KbDexDto::new( + "raydium_amm_v4".to_string(), + "Raydium AmmV4".to_string(), + Some("RaydiumProgram111".to_string()), + None, + true, + ); + let dex_a_id_result = crate::upsert_dex(database.as_ref(), &dex_a).await; + let dex_a_id = match dex_a_id_result { + Ok(dex_a_id) => dex_a_id, + Err(error) => panic!("dex A upsert must succeed: {}", error), + }; + let dex_b = crate::KbDexDto::new( + "meteora_dbc".to_string(), + "Meteora DBC".to_string(), + Some("MeteoraProgram111".to_string()), + Some("SharedRouter111".to_string()), + true, + ); + let dex_b_id_result = crate::upsert_dex(database.as_ref(), &dex_b).await; + let dex_b_id = match dex_b_id_result { + Ok(dex_b_id) => dex_b_id, + Err(error) => panic!("dex B upsert must succeed: {}", error), + }; + let dex_c = crate::KbDexDto::new( + "pump_swap".to_string(), + "PumpSwap".to_string(), + Some("PumpSwapProgram111".to_string()), + Some("SharedRouter111".to_string()), + true, + ); + let dex_c_id_result = crate::upsert_dex(database.as_ref(), &dex_c).await; + let dex_c_id = match dex_c_id_result { + Ok(dex_c_id) => dex_c_id, + Err(error) => panic!("dex C upsert must succeed: {}", error), + }; + let pool_a = crate::KbPoolDto::new( + dex_a_id, + "PoolAddress111".to_string(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Active, + ); + let pool_a_result = crate::upsert_pool(database.as_ref(), &pool_a).await; + if let Err(error) = pool_a_result { + panic!("pool A upsert must succeed: {}", error); + } + let pool_b = crate::KbPoolDto::new( + dex_b_id, + "PoolAddress222".to_string(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Active, + ); + let pool_b_result = crate::upsert_pool(database.as_ref(), &pool_b).await; + if let Err(error) = pool_b_result { + panic!("pool B upsert must succeed: {}", error); + } + let pool_c = crate::KbPoolDto::new( + dex_c_id, + "PoolAddress333".to_string(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Inactive, + ); + let pool_c_result = crate::upsert_pool(database.as_ref(), &pool_c).await; + if let Err(error) = pool_c_result { + panic!("pool C upsert must succeed: {}", error); + } + } + + #[tokio::test] + async fn collect_program_targets_deduplicates_program_and_router_ids() { + let database = make_database().await; + seed_dexes_and_pools(database.clone()).await; + let service = crate::KbWsHybridWatchService::new(database); + let targets_result = service.collect_program_targets().await; + let targets = match targets_result { + Ok(targets) => targets, + Err(error) => panic!("collect program targets must succeed: {}", error), + }; + assert_eq!(targets.len(), 4); + let mut addresses = std::vec::Vec::new(); + for target in &targets { + assert_eq!(target.kind, crate::KbWsWatchTargetKind::Program); + addresses.push(target.address.clone()); + } + addresses.sort(); + assert_eq!( + addresses, + vec![ + "MeteoraProgram111".to_string(), + "PumpSwapProgram111".to_string(), + "RaydiumProgram111".to_string(), + "SharedRouter111".to_string(), + ] + ); + } + + #[tokio::test] + async fn collect_account_targets_only_keeps_active_pools() { + let database = make_database().await; + seed_dexes_and_pools(database.clone()).await; + let service = crate::KbWsHybridWatchService::new(database); + let targets_result = service.collect_account_targets().await; + let targets = match targets_result { + Ok(targets) => targets, + Err(error) => panic!("collect account targets must succeed: {}", error), + }; + assert_eq!(targets.len(), 2); + let mut addresses = std::vec::Vec::new(); + for target in &targets { + assert_eq!(target.kind, crate::KbWsWatchTargetKind::Account); + addresses.push(target.address.clone()); + } + addresses.sort(); + assert_eq!( + addresses, + vec!["PoolAddress111".to_string(), "PoolAddress222".to_string(),] + ); + } +} diff --git a/kb_lib/src/ws_manager.rs b/kb_lib/src/ws_manager.rs index d9ef516..3ff08a7 100644 --- a/kb_lib/src/ws_manager.rs +++ b/kb_lib/src/ws_manager.rs @@ -53,6 +53,8 @@ pub struct WsManager { >, transaction_resolution_relay_abort_handle: tokio::sync::Mutex>, + hybrid_observation_relay_abort_handle: + tokio::sync::Mutex>, } impl WsManager { @@ -104,6 +106,7 @@ impl WsManager { detection_relay_abort_handle: tokio::sync::Mutex::new(None), transaction_resolution_relay_sender: tokio::sync::Mutex::new(None), transaction_resolution_relay_abort_handle: tokio::sync::Mutex::new(None), + hybrid_observation_relay_abort_handle: tokio::sync::Mutex::new(None), }) } @@ -469,11 +472,7 @@ impl WsManager { )); } } - let resolver = crate::KbTransactionResolutionService::new( - http_pool, - database, - http_role, - ); + let resolver = crate::KbTransactionResolutionService::new(http_pool, database, http_role); let relay = crate::KbWsTransactionResolutionRelay::new(resolver); let (sender, receiver) = crate::KbWsTransactionResolutionRelay::channel(queue_capacity); let relay_task = relay.spawn(receiver); @@ -530,6 +529,50 @@ impl WsManager { } Ok(()) } + + /// Collects the current hybrid WebSocket watch snapshot. + pub async fn collect_hybrid_watch_snapshot( + &self, + database: std::sync::Arc, + ) -> Result { + let runtime = crate::KbWsHybridRuntimeService::new(database); + runtime.collect_watch_snapshot().await + } + + /// Attaches one shared hybrid observation relay to the manager event stream. + pub async fn attach_hybrid_observation_relay( + &self, + database: std::sync::Arc, + ) -> Result<(), crate::KbError> { + { + let abort_guard = self.hybrid_observation_relay_abort_handle.lock().await; + if abort_guard.is_some() { + return Err(crate::KbError::InvalidState( + "websocket hybrid observation relay is already attached".to_string(), + )); + } + } + let runtime = crate::KbWsHybridRuntimeService::new(database); + let receiver = self.subscribe_events(); + let abort_handle = spawn_hybrid_observation_relay_task(receiver, runtime); + { + let mut abort_guard = self.hybrid_observation_relay_abort_handle.lock().await; + *abort_guard = Some(abort_handle); + } + Ok(()) + } + + /// Detaches the shared hybrid observation relay from the manager event stream. + pub async fn detach_hybrid_observation_relay(&self) -> Result<(), crate::KbError> { + let abort_handle_option = { + let mut abort_guard = self.hybrid_observation_relay_abort_handle.lock().await; + abort_guard.take() + }; + if let Some(abort_handle) = abort_handle_option { + abort_handle.abort(); + } + Ok(()) + } } impl Drop for WsManager { @@ -552,6 +595,12 @@ impl Drop for WsManager { abort_handle.abort(); } } + let hybrid_abort_result = self.hybrid_observation_relay_abort_handle.try_lock(); + if let Ok(mut abort_guard) = hybrid_abort_result { + if let Some(abort_handle) = abort_guard.take() { + abort_handle.abort(); + } + } } } @@ -574,10 +623,179 @@ fn spawn_event_forward_task( } } }); + task.abort_handle() +} + +fn spawn_hybrid_observation_relay_task( + mut receiver: tokio::sync::broadcast::Receiver, + runtime: crate::KbWsHybridRuntimeService, +) -> tokio::task::AbortHandle { + let task = tokio::spawn(async move { + loop { + let recv_result = receiver.recv().await; + match recv_result { + Ok(event) => { + handle_hybrid_observation_manager_event(&runtime, event).await; + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + break; + } + } + } + }); task.abort_handle() } +async fn handle_hybrid_observation_manager_event( + runtime: &crate::KbWsHybridRuntimeService, + event: crate::WsEvent, +) { + match event { + crate::WsEvent::SubscriptionNotification { + endpoint_name, + subscription, + notification, + .. + } => { + handle_hybrid_subscription_notification( + runtime, + endpoint_name, + subscription, + notification, + ) + .await; + } + crate::WsEvent::JsonRpcNotificationWithoutSubscription { + endpoint_name, + notification, + } => { + if notification.method == "logsNotification" { + let value_result = serde_json::to_value(notification.clone()); + let value = match value_result { + Ok(value) => value, + Err(error) => { + tracing::warn!( + target: "kb_lib::ws_manager", + "cannot serialize logs notification for hybrid relay: {}", + error + ); + return; + } + }; + let record_result = runtime + .record_logs_notification(Some(endpoint_name), &value) + .await; + if let Err(error) = record_result { + tracing::warn!( + target: "kb_lib::ws_manager", + "hybrid logs observation failed: {}", + error + ); + } + } + } + _ => {} + } +} + +async fn handle_hybrid_subscription_notification( + runtime: &crate::KbWsHybridRuntimeService, + endpoint_name: std::string::String, + subscription: crate::WsSubscriptionInfo, + notification: crate::KbJsonRpcWsNotification, +) { + let value_result = serde_json::to_value(notification.clone()); + let value = match value_result { + Ok(value) => value, + Err(error) => { + tracing::warn!( + target: "kb_lib::ws_manager", + "cannot serialize subscription notification for hybrid relay: {}", + error + ); + return; + } + }; + let method = notification.method.as_str(); + if method == "logsNotification" { + let record_result = runtime + .record_logs_notification(Some(endpoint_name), &value) + .await; + if let Err(error) = record_result { + tracing::warn!( + target: "kb_lib::ws_manager", + "hybrid logs observation failed: {}", + error + ); + } + return; + } + if method == "programNotification" { + let watched_program_id = kb_first_subscription_param_as_string(&subscription); + let watched_program_id = match watched_program_id { + Some(watched_program_id) => watched_program_id, + None => { + tracing::warn!( + target: "kb_lib::ws_manager", + "missing watched program id in subscription params" + ); + return; + } + }; + let record_result = runtime + .record_program_notification(Some(endpoint_name), watched_program_id, &value) + .await; + if let Err(error) = record_result { + tracing::warn!( + target: "kb_lib::ws_manager", + "hybrid program observation failed: {}", + error + ); + } + return; + } + if method == "accountNotification" { + let watched_account = kb_first_subscription_param_as_string(&subscription); + let watched_account = match watched_account { + Some(watched_account) => watched_account, + None => { + tracing::warn!( + target: "kb_lib::ws_manager", + "missing watched account in subscription params" + ); + return; + } + }; + let record_result = runtime + .record_account_notification(Some(endpoint_name), watched_account, &value) + .await; + if let Err(error) = record_result { + tracing::warn!( + target: "kb_lib::ws_manager", + "hybrid account observation failed: {}", + error + ); + } + } +} + +fn kb_first_subscription_param_as_string( + subscription: &crate::WsSubscriptionInfo, +) -> std::option::Option { + let first_param_option = subscription.params.first(); + let first_param = match first_param_option { + Some(first_param) => first_param, + None => return None, + }; + let text_option = first_param.as_str(); + match text_option { + Some(text) => Some(text.to_string()), + None => None, + } +} + #[cfg(test)] mod tests { #[derive(Debug)] @@ -811,6 +1029,213 @@ mod tests { } } + async fn spawn_hybrid_json_rpc_server() -> Self { + let listener_result = tokio::net::TcpListener::bind("127.0.0.1:0").await; + let listener = match listener_result { + Ok(listener) => listener, + Err(error) => panic!("tcp bind failed: {error}"), + }; + let local_addr_result = listener.local_addr(); + let local_addr = match local_addr_result { + Ok(local_addr) => local_addr, + Err(error) => panic!("local_addr failed: {error}"), + }; + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut shutdown_rx => { + break; + } + accept_result = listener.accept() => { + let (stream, _) = match accept_result { + Ok(pair) => pair, + Err(_) => break, + }; + tokio::spawn(async move { + let accept_result = tokio_tungstenite::accept_async(stream).await; + let mut websocket = match accept_result { + Ok(websocket) => websocket, + Err(_) => return, + }; + loop { + let next_result = futures_util::StreamExt::next(&mut websocket).await; + let message_result = match next_result { + Some(message_result) => message_result, + None => break, + }; + let message = match message_result { + Ok(message) => message, + Err(_) => break, + }; + match message { + tokio_tungstenite::tungstenite::Message::Text(text) => { + let parse_result: Result = + serde_json::from_str(text.as_str()); + let request_value = match parse_result { + Ok(request_value) => request_value, + Err(_) => continue, + }; + let method_option = request_value + .get("method") + .and_then(serde_json::Value::as_str); + let method = match method_option { + Some(method) => method, + None => continue, + }; + let id_value = match request_value.get("id") { + Some(id_value) => id_value.clone(), + None => serde_json::Value::from(1_u64), + }; + if method == "logsSubscribe" { + let success_text = serde_json::json!({ + "jsonrpc": "2.0", + "result": 701_u64, + "id": id_value, + }).to_string(); + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Text(success_text.into()), + ).await; + if send_result.is_err() { + break; + } + let notification_text = serde_json::json!({ + "jsonrpc": "2.0", + "method": "logsNotification", + "params": { + "result": { + "context": { + "slot": 1001_u64 + }, + "value": { + "signature": "HybridLogsSig111", + "err": null, + "logs": [ + "Program log: Instruction: Swap" + ] + } + }, + "subscription": 701_u64 + } + }).to_string(); + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Text(notification_text.into()), + ).await; + if send_result.is_err() { + break; + } + } else if method == "programSubscribe" { + let success_text = serde_json::json!({ + "jsonrpc": "2.0", + "result": 702_u64, + "id": id_value, + }).to_string(); + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Text(success_text.into()), + ).await; + if send_result.is_err() { + break; + } + let notification_text = serde_json::json!({ + "jsonrpc": "2.0", + "method": "programNotification", + "params": { + "result": { + "context": { + "slot": 1002_u64 + }, + "value": { + "pubkey": "HybridProgramOwned111", + "account": { + "lamports": 1, + "owner": "HybridProgram111" + } + } + }, + "subscription": 702_u64 + } + }).to_string(); + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Text(notification_text.into()), + ).await; + if send_result.is_err() { + break; + } + } else if method == "accountSubscribe" { + let success_text = serde_json::json!({ + "jsonrpc": "2.0", + "result": 703_u64, + "id": id_value, + }).to_string(); + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Text(success_text.into()), + ).await; + if send_result.is_err() { + break; + } + let notification_text = serde_json::json!({ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "context": { + "slot": 1003_u64 + }, + "value": { + "lamports": 10, + "owner": "HybridProgram111" + } + }, + "subscription": 703_u64 + } + }).to_string(); + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Text(notification_text.into()), + ).await; + if send_result.is_err() { + break; + } + } + } + tokio_tungstenite::tungstenite::Message::Ping(data) => { + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Pong(data), + ).await; + if send_result.is_err() { + break; + } + } + tokio_tungstenite::tungstenite::Message::Close(frame) => { + let send_result = futures_util::SinkExt::send( + &mut websocket, + tokio_tungstenite::tungstenite::Message::Close(frame), + ).await; + let _ = send_result; + break; + } + tokio_tungstenite::tungstenite::Message::Binary(_) => {} + tokio_tungstenite::tungstenite::Message::Pong(_) => {} + tokio_tungstenite::tungstenite::Message::Frame(_) => {} + } + } + }); + } + } + } + }); + Self { + url: format!("ws://{}", local_addr), + shutdown_tx: Some(shutdown_tx), + } + } + async fn shutdown(mut self) { if let Some(shutdown_tx) = self.shutdown_tx.take() { let _ = shutdown_tx.send(()); @@ -1096,4 +1521,151 @@ mod tests { assert_eq!(program_endpoints, vec!["ws_b".to_string()]); assert!(unknown_endpoints.is_empty()); } + + async fn seed_hybrid_watch_database(database: &crate::KbDatabase) { + let dex_a = crate::KbDexDto::new( + "raydium_amm_v4".to_string(), + "Raydium AmmV4".to_string(), + Some("HybridRaydiumProgram111".to_string()), + None, + true, + ); + let dex_a_id_result = crate::upsert_dex(database, &dex_a).await; + let dex_a_id = match dex_a_id_result { + Ok(dex_a_id) => dex_a_id, + Err(error) => panic!("dex A upsert failed: {error}"), + }; + let dex_b = crate::KbDexDto::new( + "meteora_dbc".to_string(), + "Meteora DBC".to_string(), + Some("HybridMeteoraProgram111".to_string()), + Some("HybridSharedRouter111".to_string()), + true, + ); + let dex_b_id_result = crate::upsert_dex(database, &dex_b).await; + let dex_b_id = match dex_b_id_result { + Ok(dex_b_id) => dex_b_id, + Err(error) => panic!("dex B upsert failed: {error}"), + }; + let pool_a = crate::KbPoolDto::new( + dex_a_id, + "HybridPool111".to_string(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Active, + ); + let pool_a_result = crate::upsert_pool(database, &pool_a).await; + if let Err(error) = pool_a_result { + panic!("pool A upsert failed: {error}"); + } + let pool_b = crate::KbPoolDto::new( + dex_b_id, + "HybridPool222".to_string(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Inactive, + ); + let pool_b_result = crate::upsert_pool(database, &pool_b).await; + if let Err(error) = pool_b_result { + panic!("pool B upsert failed: {error}"); + } + } + + #[tokio::test] + async fn collect_hybrid_watch_snapshot_reads_dexes_and_active_pools() { + let endpoints = vec![make_ws_endpoint( + "ws_a", + "ws://127.0.0.1:1".to_string(), + true, + )]; + let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); + let manager = match manager_result { + Ok(manager) => manager, + Err(error) => panic!("from_ws_endpoints failed: {error}"), + }; + + let database = create_database().await; + seed_hybrid_watch_database(&database).await; + let database = std::sync::Arc::new(database); + + let snapshot_result = manager.collect_hybrid_watch_snapshot(database).await; + let snapshot = match snapshot_result { + Ok(snapshot) => snapshot, + Err(error) => panic!("collect_hybrid_watch_snapshot failed: {error}"), + }; + + assert_eq!(snapshot.program_targets.len(), 3); + assert_eq!(snapshot.account_targets.len(), 1); + assert_eq!( + snapshot.account_targets[0].address, + "HybridPool111".to_string() + ); + } + + #[tokio::test] + async fn attach_hybrid_observation_relay_records_logs_program_and_account_notifications() { + let server = TestWsServer::spawn_hybrid_json_rpc_server().await; + let endpoints = vec![make_ws_endpoint("ws_a", server.url.clone(), true)]; + let manager_result = crate::WsManager::from_ws_endpoints(&endpoints); + let manager = match manager_result { + Ok(manager) => manager, + Err(error) => panic!("from_ws_endpoints failed: {error}"), + }; + let database = create_database().await; + let database = std::sync::Arc::new(database); + let attach_result = manager + .attach_hybrid_observation_relay(database.clone()) + .await; + if let Err(error) = attach_result { + panic!("attach_hybrid_observation_relay failed: {error}"); + } + let start_result = manager.start_all().await; + if let Err(error) = start_result { + panic!("start_all failed: {error}"); + } + let client_option = manager.client("ws_a").await; + let client = match client_option { + Some(client) => client, + None => panic!("client must exist"), + }; + let logs_subscribe_result = client + .logs_subscribe_raw(serde_json::json!("all"), None) + .await; + if let Err(error) = logs_subscribe_result { + panic!("logsSubscribe failed: {error}"); + } + let program_subscribe_result = client + .program_subscribe_raw("HybridProgram111".to_string(), None) + .await; + if let Err(error) = program_subscribe_result { + panic!("programSubscribe failed: {error}"); + } + let account_subscribe_result = client + .account_subscribe_raw("HybridPool111".to_string(), None) + .await; + if let Err(error) = account_subscribe_result { + panic!("accountSubscribe failed: {error}"); + } + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + let observations_result = + crate::list_recent_onchain_observations(database.as_ref(), 20).await; + let observations = match observations_result { + Ok(observations) => observations, + Err(error) => panic!("list_recent_onchain_observations failed: {error}"), + }; + let mut kinds = std::collections::BTreeSet::new(); + for observation in &observations { + kinds.insert(observation.observation_kind.clone()); + } + assert!(kinds.contains("ws.hybrid.logs_notification")); + assert!(kinds.contains("ws.hybrid.program_notification")); + assert!(kinds.contains("ws.hybrid.account_notification")); + let stop_result = manager.stop_all().await; + if let Err(error) = stop_result { + panic!("stop_all failed: {error}"); + } + let detach_result = manager.detach_hybrid_observation_relay().await; + if let Err(error) = detach_result { + panic!("detach_hybrid_observation_relay failed: {error}"); + } + server.shutdown().await; + } }