diff --git a/CHANGELOG.md b/CHANGELOG.md index 01c57f6..79e713c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,3 +24,4 @@ 0.5.5 - Ajout des événements métier normalisés pour les swaps, liquidités, mints et burns de tokens 0.5.6 - Consolidation de la couche stockage : activation des foreign keys SQLite, lectures ciblées sur le modèle métier normalisé, index supplémentaires et tests unitaires dédiés 0.6.0 - Ajout du pipeline de détection technique : façade de persistance pour observations on-chain, signaux d’analyse et candidats tokens depuis les connecteurs RPC +0.6.1 - Ajout du bridge de détection Solana WS : notifications JSON-RPC persistées en observations, avec détection initiale des mints SPL / Token-2022 depuis programNotification diff --git a/Cargo.toml b/Cargo.toml index 73c6ea3..7c60452 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.6.0" +version = "0.6.1" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index ddcf940..8d24ddf 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -394,14 +394,12 @@ Objectif : relier les connecteurs RPC à la couche de stockage technique et mét - préparer les prochaines étapes de détection technique on-chain / RPC. ### 6.26. Version `0.6.1` — Détection technique RPC -Objectif : brancher les premiers watchers et règles techniques sur la façade de détection. +Réalisé : -À faire : - -- relier les notifications WS / RPC au pipeline de détection, -- produire des observations on-chain normalisées, -- générer les premiers signaux techniques exploitables, -- préparer la découverte effective des tokens et pools avant les connecteurs DEX dédiés. +- ajout d’un bridge `Solana WS notification -> pipeline de détection`, +- persistance des notifications WS utiles comme observations on-chain normalisées, +- génération d’un candidat token quand une `programNotification` expose un mint SPL / Token-2022 en JSON parsé, +- préparation du branchement futur des watchers et règles RPC réelles sur une façade de détection unique. ### 6.27. Version `0.7.x` — DEX connectors v1 Objectif : structurer les connecteurs par protocole. diff --git a/kb_lib/src/detect.rs b/kb_lib/src/detect.rs index bb57f2a..bef0f8a 100644 --- a/kb_lib/src/detect.rs +++ b/kb_lib/src/detect.rs @@ -7,9 +7,12 @@ //! candidate tokens are persisted before richer detection logic is added. mod service; +mod solana_ws; mod types; pub use crate::detect::service::KbDetectionPersistenceService; +pub use crate::detect::solana_ws::KbSolanaWsDetectionOutcome; +pub use crate::detect::solana_ws::KbSolanaWsDetectionService; pub use crate::detect::types::KbDetectionObservationInput; pub use crate::detect::types::KbDetectionSignalInput; pub use crate::detect::types::KbDetectionTokenCandidateInput; diff --git a/kb_lib/src/detect/solana_ws.rs b/kb_lib/src/detect/solana_ws.rs new file mode 100644 index 0000000..82d6bed --- /dev/null +++ b/kb_lib/src/detect/solana_ws.rs @@ -0,0 +1,503 @@ +// file: kb_lib/src/detect/solana_ws.rs + +//! Solana WebSocket detection bridge. +//! +//! This module converts raw Solana JSON-RPC WebSocket notifications into +//! normalized observations and, when possible, token candidates. + +/// Result of one Solana WebSocket detection pass. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum KbSolanaWsDetectionOutcome { + /// The notification is currently ignored by the detector. + Ignored, + /// A technical observation was stored. + ObservationRecorded { + /// Persisted observation id. + observation_id: i64, + }, + /// A token candidate was registered. + TokenCandidateRegistered { + /// Persistence result. + result: crate::KbDetectionTokenCandidateResult, + }, +} + +/// Detection service for Solana WebSocket notifications. +#[derive(Debug, Clone)] +pub struct KbSolanaWsDetectionService { + /// Shared persistence façade. + persistence: crate::KbDetectionPersistenceService, +} + +impl KbSolanaWsDetectionService { + /// Creates a new Solana WebSocket detection service. + pub fn new(persistence: crate::KbDetectionPersistenceService) -> Self { + Self { persistence } + } + + /// Returns the shared persistence façade. + pub fn persistence(&self) -> &crate::KbDetectionPersistenceService { + &self.persistence + } + + /// Processes one Solana WebSocket JSON-RPC notification. + pub async fn process_notification( + &self, + endpoint_name: std::option::Option, + notification: &crate::KbJsonRpcWsNotification, + ) -> Result { + let observation_kind_option = + map_notification_method_to_observation_kind(notification.method.as_str()); + let observation_kind = match observation_kind_option { + Some(observation_kind) => observation_kind, + None => return Ok(crate::KbSolanaWsDetectionOutcome::Ignored), + }; + let token_candidate_result = self + .try_register_token_candidate(endpoint_name.clone(), notification) + .await; + match token_candidate_result { + Ok(Some(result)) => { + return Ok(crate::KbSolanaWsDetectionOutcome::TokenCandidateRegistered { result }); + } + Ok(None) => {} + Err(error) => return Err(error), + } + let payload = build_notification_payload(notification); + let object_key = build_object_key( + notification.method.as_str(), + ¬ification.params.result, + notification.params.subscription, + ); + let slot = + extract_slot_from_result(notification.method.as_str(), ¬ification.params.result); + let observation_input = crate::KbDetectionObservationInput::new( + observation_kind, + crate::KbObservationSourceKind::WsRpc, + endpoint_name, + object_key, + slot, + payload, + ); + let observation_id_result = self + .persistence + .record_observation(&observation_input) + .await; + let observation_id = match observation_id_result { + Ok(observation_id) => observation_id, + Err(error) => return Err(error), + }; + Ok(crate::KbSolanaWsDetectionOutcome::ObservationRecorded { observation_id }) + } + + /// Tries to register a token candidate from one notification. + async fn try_register_token_candidate( + &self, + endpoint_name: std::option::Option, + notification: &crate::KbJsonRpcWsNotification, + ) -> Result, crate::KbError> { + if notification.method.as_str() != "programNotification" { + return Ok(None); + } + let result_value = ¬ification.params.result; + let pubkey_option = extract_pubkey_from_result(result_value); + let pubkey = match pubkey_option { + Some(pubkey) => pubkey, + None => return Ok(None), + }; + let account_value_option = extract_account_value_from_result(result_value); + let account_value = match account_value_option { + Some(account_value) => account_value, + None => return Ok(None), + }; + let parsed_type_option = extract_parsed_account_type(account_value); + let parsed_type = match parsed_type_option { + Some(parsed_type) => parsed_type, + None => return Ok(None), + }; + if parsed_type.as_str() != "mint" { + return Ok(None); + } + let token_program_option = extract_account_owner(account_value); + let token_program = match token_program_option { + Some(token_program) => token_program, + None => return Ok(None), + }; + if token_program != crate::SPL_TOKEN_PROGRAM_ID.to_string() + && token_program != crate::SPL_TOKEN_2022_PROGRAM_ID.to_string() + { + return Ok(None); + } + let decimals = extract_decimals_from_account_value(account_value); + let slot = + extract_slot_from_result(notification.method.as_str(), ¬ification.params.result); + let payload = build_notification_payload(notification); + let is_quote_token = pubkey == crate::WSOL_MINT_ID.to_string(); + let input = crate::KbDetectionTokenCandidateInput::new( + pubkey, + None, + None, + decimals, + token_program, + is_quote_token, + crate::KbObservationSourceKind::WsRpc, + endpoint_name, + slot, + "ws.program_notification".to_string(), + payload.clone(), + "signal.token_mint_account_detected".to_string(), + crate::KbAnalysisSignalSeverity::Medium, + None, + Some(payload), + ); + let result = self.persistence.register_token_candidate(&input).await; + match result { + Ok(result) => Ok(Some(result)), + Err(error) => Err(error), + } + } +} + +/// Maps one WebSocket notification method to an observation kind. +fn map_notification_method_to_observation_kind( + method: &str, +) -> std::option::Option { + match method { + "accountNotification" => Some("ws.account_notification".to_string()), + "blockNotification" => Some("ws.block_notification".to_string()), + "logsNotification" => Some("ws.logs_notification".to_string()), + "programNotification" => Some("ws.program_notification".to_string()), + "rootNotification" => Some("ws.root_notification".to_string()), + "signatureNotification" => Some("ws.signature_notification".to_string()), + "slotNotification" => Some("ws.slot_notification".to_string()), + "slotsUpdatesNotification" => Some("ws.slots_updates_notification".to_string()), + "voteNotification" => Some("ws.vote_notification".to_string()), + _ => None, + } +} + +/// Wraps one raw notification into a normalized JSON payload. +fn build_notification_payload(notification: &crate::KbJsonRpcWsNotification) -> serde_json::Value { + serde_json::json!({ + "jsonrpc": notification.jsonrpc, + "method": notification.method, + "subscription": notification.params.subscription, + "result": notification.params.result, + }) +} + +/// Builds one logical object key from the notification result. +fn build_object_key( + method: &str, + result: &serde_json::Value, + subscription: u64, +) -> std::string::String { + let pubkey_option = extract_pubkey_from_result(result); + if let Some(pubkey) = pubkey_option { + return pubkey; + } + let signature_option = extract_signature_from_result(result); + if let Some(signature) = signature_option { + return signature; + } + let slot_option = extract_slot_from_result(method, result); + if let Some(slot) = slot_option { + return format!("slot:{slot}"); + } + format!("subscription:{subscription}") +} + +/// Extracts a slot number from one notification result. +fn extract_slot_from_result(method: &str, result: &serde_json::Value) -> std::option::Option { + if method == "rootNotification" { + if let Some(slot) = result.as_u64() { + return Some(slot); + } + } + if let Some(slot) = result.get("slot").and_then(serde_json::Value::as_u64) { + return Some(slot); + } + if let Some(context) = result.get("context") { + if let Some(slot) = context.get("slot").and_then(serde_json::Value::as_u64) { + return Some(slot); + } + } + if let Some(value) = result.get("value") { + if let Some(slot) = value.get("slot").and_then(serde_json::Value::as_u64) { + return Some(slot); + } + } + None +} + +/// Extracts a pubkey from one notification result. +fn extract_pubkey_from_result( + result: &serde_json::Value, +) -> std::option::Option { + if let Some(pubkey) = result.get("pubkey").and_then(serde_json::Value::as_str) { + return Some(pubkey.to_string()); + } + if let Some(value) = result.get("value") { + if let Some(pubkey) = value.get("pubkey").and_then(serde_json::Value::as_str) { + return Some(pubkey.to_string()); + } + } + None +} + +/// Extracts a signature from one notification result. +fn extract_signature_from_result( + result: &serde_json::Value, +) -> std::option::Option { + if let Some(signature) = result.get("signature").and_then(serde_json::Value::as_str) { + return Some(signature.to_string()); + } + if let Some(value) = result.get("value") { + if let Some(signature) = value.get("signature").and_then(serde_json::Value::as_str) { + return Some(signature.to_string()); + } + } + None +} + +/// Extracts one account-like JSON object from one notification result. +fn extract_account_value_from_result<'a>( + result: &'a serde_json::Value, +) -> std::option::Option<&'a serde_json::Value> { + if let Some(account) = result.get("account") { + return Some(account); + } + if let Some(value) = result.get("value") { + if let Some(account) = value.get("account") { + return Some(account); + } + if value.is_object() { + return Some(value); + } + } + None +} + +/// Extracts the parsed account type from one account-like JSON object. +fn extract_parsed_account_type( + account_value: &serde_json::Value, +) -> std::option::Option { + let data_option = account_value.get("data"); + let data = match data_option { + Some(data) => data, + None => return None, + }; + let parsed_option = data.get("parsed"); + let parsed = match parsed_option { + Some(parsed) => parsed, + None => return None, + }; + let type_option = parsed.get("type").and_then(serde_json::Value::as_str); + match type_option { + Some(parsed_type) => Some(parsed_type.to_string()), + None => None, + } +} + +/// Extracts the token program owner from one account-like JSON object. +fn extract_account_owner( + account_value: &serde_json::Value, +) -> std::option::Option { + let owner_option = account_value + .get("owner") + .and_then(serde_json::Value::as_str); + match owner_option { + Some(owner) => Some(owner.to_string()), + None => None, + } +} + +/// Extracts the decimals value from one parsed mint account-like JSON object. +fn extract_decimals_from_account_value( + account_value: &serde_json::Value, +) -> std::option::Option { + let data_option = account_value.get("data"); + let data = match data_option { + Some(data) => data, + None => return None, + }; + let parsed_option = data.get("parsed"); + let parsed = match parsed_option { + Some(parsed) => parsed, + None => return None, + }; + let info_option = parsed.get("info"); + let info = match info_option { + Some(info) => info, + None => return None, + }; + let decimals_option = info.get("decimals").and_then(serde_json::Value::as_u64); + let decimals = match decimals_option { + Some(decimals) => decimals, + None => return None, + }; + let converted = u8::try_from(decimals); + match converted { + Ok(decimals) => Some(decimals), + Err(_) => None, + } +} + +#[cfg(test)] +mod tests { + async fn create_database() -> crate::KbDatabase { + let tempdir = tempfile::tempdir().expect("tempdir must succeed"); + let database_path = tempdir.path().join("detect_ws.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + crate::KbDatabase::connect_and_initialize(&config) + .await + .expect("database init must succeed") + } + + fn build_slot_notification() -> crate::KbJsonRpcWsNotification { + crate::KbJsonRpcWsNotification { + jsonrpc: "2.0".to_string(), + method: "slotNotification".to_string(), + params: crate::KbJsonRpcWsNotificationParams { + result: serde_json::json!({ + "slot": 414726860_u64, + "parent": 414726859_u64, + "root": 414726828_u64 + }), + subscription: 1008_u64, + }, + } + } + + fn build_program_mint_notification() -> crate::KbJsonRpcWsNotification { + crate::KbJsonRpcWsNotification { + jsonrpc: "2.0".to_string(), + method: "programNotification".to_string(), + params: crate::KbJsonRpcWsNotificationParams { + result: serde_json::json!({ + "context": { + "slot": 777777_u64 + }, + "value": { + "pubkey": "Mint111111111111111111111111111111111111111", + "account": { + "owner": crate::SPL_TOKEN_PROGRAM_ID.to_string(), + "data": { + "program": "spl-token", + "parsed": { + "type": "mint", + "info": { + "decimals": 6_u64 + } + } + } + } + } + }), + subscription: 2048_u64, + }, + } + } + + #[tokio::test] + async fn slot_notification_records_observation() { + let database = create_database().await; + let persistence = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database)); + let detector = crate::KbSolanaWsDetectionService::new(persistence); + let outcome_result = detector + .process_notification( + Some("mainnet_public_ws_slots".to_string()), + &build_slot_notification(), + ) + .await; + let outcome = match outcome_result { + Ok(outcome) => outcome, + Err(error) => panic!("process_notification failed: {error}"), + }; + match outcome { + crate::KbSolanaWsDetectionOutcome::ObservationRecorded { observation_id } => { + assert!(observation_id > 0); + } + _ => panic!("unexpected detection outcome"), + } + let observations_result = + crate::list_recent_onchain_observations(detector.persistence().database().as_ref(), 10) + .await; + let observations = match observations_result { + Ok(observations) => observations, + Err(error) => panic!("list_recent_onchain_observations failed: {error}"), + }; + assert_eq!(observations.len(), 1); + assert_eq!(observations[0].observation_kind, "ws.slot_notification"); + assert_eq!(observations[0].object_key, "slot:414726860"); + } + + #[tokio::test] + async fn program_mint_notification_registers_token_candidate() { + let database = create_database().await; + let persistence = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database)); + let detector = crate::KbSolanaWsDetectionService::new(persistence); + let outcome_result = detector + .process_notification( + Some("helius_primary_ws_programs".to_string()), + &build_program_mint_notification(), + ) + .await; + let outcome = match outcome_result { + Ok(outcome) => outcome, + Err(error) => panic!("process_notification failed: {error}"), + }; + match outcome { + crate::KbSolanaWsDetectionOutcome::TokenCandidateRegistered { result } => { + assert!(result.token_id > 0); + assert!(result.observation_id > 0); + assert!(result.signal_id > 0); + } + _ => panic!("unexpected detection outcome"), + } + let token_result = crate::get_token_by_mint( + detector.persistence().database().as_ref(), + "Mint111111111111111111111111111111111111111", + ) + .await; + let token_option = match token_result { + Ok(token_option) => token_option, + Err(error) => panic!("get_token_by_mint failed: {error}"), + }; + assert!(token_option.is_some()); + let observations_result = + crate::list_recent_onchain_observations(detector.persistence().database().as_ref(), 10) + .await; + let observations = match observations_result { + Ok(observations) => observations, + Err(error) => panic!("list_recent_onchain_observations failed: {error}"), + }; + let signals_result = + crate::list_recent_analysis_signals(detector.persistence().database().as_ref(), 10) + .await; + let signals = match signals_result { + Ok(signals) => signals, + Err(error) => panic!("list_recent_analysis_signals failed: {error}"), + }; + assert_eq!(observations.len(), 1); + assert_eq!(signals.len(), 1); + assert_eq!( + observations[0].object_key, + "Mint111111111111111111111111111111111111111" + ); + assert_eq!( + signals[0].object_key, + "Mint111111111111111111111111111111111111111" + ); + } +} diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index b6f2925..3e31f87 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -157,3 +157,5 @@ pub use crate::detect::KbDetectionPersistenceService; pub use crate::detect::KbDetectionSignalInput; pub use crate::detect::KbDetectionTokenCandidateInput; pub use crate::detect::KbDetectionTokenCandidateResult; +pub use crate::detect::KbSolanaWsDetectionOutcome; +pub use crate::detect::KbSolanaWsDetectionService;