diff --git a/CHANGELOG.md b/CHANGELOG.md index 82b271a..57e1053 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,3 +27,4 @@ 0.6.1 - Ajout du bridge de détection Solana WS : notifications JSON-RPC persistées en observations, avec détection initiale des mints SPL / Token-2022 depuis programNotification 0.6.2 - Branchement de WsClient vers le pipeline de détection via un relais asynchrone de notifications JSON-RPC WebSocket 0.6.3 - Enrichissement des notifications WebSocket utiles : extraction améliorée de pubkey, signature, owner, parsed account type et slot pour account/logs/signature notifications +0.6.4 - Premières règles de détection technique pour candidats pools/listings depuis programNotification en s’appuyant sur les DEX connus en base diff --git a/Cargo.toml b/Cargo.toml index 3906389..65bfee1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.6.3" +version = "0.6.4" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index 4ad38e7..5d4dee3 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -418,14 +418,13 @@ Réalisé : - préparer les règles de détection techniques réelles. ### 6.029. Version `0.6.4` — Premières règles de détection technique -Objectif : commencer la détection technique on-chain utile avant les connecteurs DEX dédiés. +Réalisé : -À faire : - -- détecter les premiers candidats pools / listings techniques, -- commencer à distinguer signaux de mint, apparition de pool et activité anormale, -- préparer l’alimentation des tables métier normalisées, -- garder la logique encore indépendante des connecteurs DEX `0.7.x`. +- détection des premiers candidats pools/listings techniques depuis `programNotification`, +- appui sur les DEX connus en base via `program_id` / `router_program_id`, +- enregistrement des pools candidats et de leur listing initial sans parsing DEX complet, +- alimentation conjointe des observations techniques, signaux d’analyse et tables métier normalisées, +- maintien d’une logique encore indépendante des connecteurs DEX `0.7.x`. ### 6.030. Version `0.6.5` — Orchestration multi-clients WebSocket Objectif : préparer la gestion coordonnée de plusieurs `WsClient`. diff --git a/kb_lib/src/detect.rs b/kb_lib/src/detect.rs index 515bde9..0ac8b00 100644 --- a/kb_lib/src/detect.rs +++ b/kb_lib/src/detect.rs @@ -15,6 +15,8 @@ pub use crate::detect::service::KbDetectionPersistenceService; pub use crate::detect::solana_ws::KbSolanaWsDetectionOutcome; pub use crate::detect::solana_ws::KbSolanaWsDetectionService; pub use crate::detect::types::KbDetectionObservationInput; +pub use crate::detect::types::KbDetectionPoolCandidateInput; +pub use crate::detect::types::KbDetectionPoolCandidateResult; pub use crate::detect::types::KbDetectionSignalInput; pub use crate::detect::types::KbDetectionTokenCandidateInput; pub use crate::detect::types::KbDetectionTokenCandidateResult; diff --git a/kb_lib/src/detect/service.rs b/kb_lib/src/detect/service.rs index b6c85fc..f14298a 100644 --- a/kb_lib/src/detect/service.rs +++ b/kb_lib/src/detect/service.rs @@ -14,7 +14,6 @@ impl KbDetectionPersistenceService { pub fn new(database: std::sync::Arc) -> Self { Self { database } } - /// Returns the shared database handle. pub fn database(&self) -> &std::sync::Arc { &self.database @@ -111,6 +110,125 @@ impl KbDetectionPersistenceService { signal_id, }) } + + /// Registers one pool candidate from a technical source. + /// + /// This method: + /// - resolves one known DEX from the supplied program id, + /// - upserts one normalized pool entry, + /// - upserts one pool listing row, + /// - stores the technical observation and derived signal. + pub async fn register_pool_candidate( + &self, + input: &crate::KbDetectionPoolCandidateInput, + ) -> Result { + let dexes_result = crate::list_dexes(self.database.as_ref()).await; + let dexes = match dexes_result { + Ok(dexes) => dexes, + Err(error) => return Err(error), + }; + let mut matched_dex_option: std::option::Option = None; + for dex in dexes { + let mut matched = false; + if let Some(program_id) = &dex.program_id { + if program_id == &input.dex_program_id { + matched = true; + } + } + if !matched { + if let Some(router_program_id) = &dex.router_program_id { + if router_program_id == &input.dex_program_id { + matched = true; + } + } + } + if matched { + matched_dex_option = Some(dex); + break; + } + } + let matched_dex = match matched_dex_option { + Some(matched_dex) => matched_dex, + None => { + return Err(crate::KbError::Db(format!( + "cannot register pool candidate: no known dex matches program id '{}'", + input.dex_program_id + ))); + } + }; + let dex_id = match matched_dex.id { + Some(dex_id) => dex_id, + None => { + return Err(crate::KbError::Db( + "cannot register pool candidate: matched dex has no id".to_string(), + )); + } + }; + let pool_dto = crate::KbPoolDto::new( + dex_id, + input.pool_address.clone(), + crate::KbPoolKind::Unknown, + crate::KbPoolStatus::Pending, + ); + let pool_id_result = crate::upsert_pool(self.database.as_ref(), &pool_dto).await; + let pool_id = match pool_id_result { + Ok(pool_id) => pool_id, + Err(error) => return Err(error), + }; + let listing_dto = crate::KbPoolListingDto::new( + dex_id, + pool_id, + None, + input.source_kind, + input.endpoint_name.clone(), + None, + None, + None, + ); + let pool_listing_id_result = + crate::upsert_pool_listing(self.database.as_ref(), &listing_dto).await; + let pool_listing_id = match pool_listing_id_result { + Ok(pool_listing_id) => pool_listing_id, + Err(error) => return Err(error), + }; + let observation_input = crate::KbDetectionObservationInput::new( + input.observation_kind.clone(), + input.source_kind, + input.endpoint_name.clone(), + input.pool_address.clone(), + input.slot, + input.observation_payload.clone(), + ); + let observation_id_result = self.record_observation(&observation_input).await; + let observation_id = match observation_id_result { + Ok(observation_id) => observation_id, + Err(error) => return Err(error), + }; + let signal_payload = match &input.signal_payload { + Some(signal_payload) => signal_payload.clone(), + None => input.observation_payload.clone(), + }; + let signal_input = crate::KbDetectionSignalInput::new( + input.signal_kind.clone(), + input.signal_severity, + input.pool_address.clone(), + Some(observation_id), + input.signal_score, + signal_payload, + ); + let signal_id_result = self.record_signal(&signal_input).await; + let signal_id = match signal_id_result { + Ok(signal_id) => signal_id, + Err(error) => return Err(error), + }; + Ok(crate::KbDetectionPoolCandidateResult { + dex_id, + pool_id, + pool_listing_id, + observation_id, + signal_id, + }) + } } #[cfg(test)] @@ -247,4 +365,60 @@ mod tests { Some(result.observation_id) ); } + + #[tokio::test] + async fn register_pool_candidate_persists_pool_listing_observation_and_signal() { + let database = create_database().await; + let service = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database)); + let dex_id = crate::upsert_dex( + service.database().as_ref(), + &crate::KbDexDto::new( + "raydium".to_string(), + "Raydium".to_string(), + Some("DexProgram111111111111111111111111111111111".to_string()), + None, + true, + ), + ) + .await + .expect("upsert dex must succeed"); + assert!(dex_id > 0); + let result = service + .register_pool_candidate(&crate::KbDetectionPoolCandidateInput::new( + "Pool111111111111111111111111111111111111111".to_string(), + "DexProgram111111111111111111111111111111111".to_string(), + crate::KbObservationSourceKind::WsRpc, + Some("helius_primary_ws_programs".to_string()), + Some(999999), + "ws.program_notification".to_string(), + serde_json::json!({ + "pool": "Pool111111111111111111111111111111111111111" + }), + "signal.pool_candidate.raydium".to_string(), + crate::KbAnalysisSignalSeverity::Medium, + None, + None, + )) + .await + .expect("register pool candidate must succeed"); + assert!(result.dex_id > 0); + assert!(result.pool_id > 0); + assert!(result.pool_listing_id > 0); + assert!(result.observation_id > 0); + assert!(result.signal_id > 0); + let pool = crate::get_pool_by_address( + service.database().as_ref(), + "Pool111111111111111111111111111111111111111", + ) + .await + .expect("get pool must succeed"); + assert!(pool.is_some()); + let pool = pool.expect("pool must exist"); + assert_eq!(pool.status, crate::KbPoolStatus::Pending); + let listing = + crate::get_pool_listing_by_pool_id(service.database().as_ref(), result.pool_id) + .await + .expect("get listing must succeed"); + assert!(listing.is_some()); + } } diff --git a/kb_lib/src/detect/solana_ws.rs b/kb_lib/src/detect/solana_ws.rs index c806d13..959da6a 100644 --- a/kb_lib/src/detect/solana_ws.rs +++ b/kb_lib/src/detect/solana_ws.rs @@ -3,7 +3,7 @@ //! Solana WebSocket detection bridge. //! //! This module converts raw Solana JSON-RPC WebSocket notifications into -//! normalized observations and, when possible, token candidates. +//! normalized observations and, when possible, token and pool candidates. /// Result of one Solana WebSocket detection pass. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -20,6 +20,11 @@ pub enum KbSolanaWsDetectionOutcome { /// Persistence result. result: crate::KbDetectionTokenCandidateResult, }, + /// A pool candidate was registered. + PoolCandidateRegistered { + /// Persistence result. + result: crate::KbDetectionPoolCandidateResult, + }, } /// Detection service for Solana WebSocket notifications. @@ -62,6 +67,16 @@ impl KbSolanaWsDetectionService { Ok(None) => {} Err(error) => return Err(error), } + let pool_candidate_result = self + .try_register_pool_candidate(endpoint_name.clone(), notification) + .await; + match pool_candidate_result { + Ok(Some(result)) => { + return Ok(crate::KbSolanaWsDetectionOutcome::PoolCandidateRegistered { result }); + } + Ok(None) => {} + Err(error) => return Err(error), + } let payload = build_notification_payload(notification); let object_key = build_object_key( notification.method.as_str(), @@ -199,6 +214,84 @@ impl KbSolanaWsDetectionService { Err(error) => Err(error), } } + + /// Tries to register a pool candidate from one notification. + async fn try_register_pool_candidate( + &self, + endpoint_name: std::option::Option, + notification: &crate::KbJsonRpcWsNotification, + ) -> Result, crate::KbError> { + if notification.method.as_str() != "programNotification" { + return Ok(None); + } + let result_value = ¬ification.params.result; + let pubkey_option = extract_pubkey_from_result(result_value); + let pubkey = match pubkey_option { + Some(pubkey) => pubkey, + None => return Ok(None), + }; + let owner_option = extract_program_notification_owner(result_value); + let owner = match owner_option { + Some(owner) => owner, + None => return Ok(None), + }; + if owner == crate::SPL_TOKEN_PROGRAM_ID.to_string() + || owner == crate::SPL_TOKEN_2022_PROGRAM_ID.to_string() + { + return Ok(None); + } + let dexes_result = crate::list_dexes(self.persistence.database().as_ref()).await; + let dexes = match dexes_result { + Ok(dexes) => dexes, + Err(error) => return Err(error), + }; + let mut matched_dex_option: std::option::Option = None; + for dex in dexes { + let mut matched = false; + if let Some(program_id) = &dex.program_id { + if program_id == &owner { + matched = true; + } + } + if !matched { + if let Some(router_program_id) = &dex.router_program_id { + if router_program_id == &owner { + matched = true; + } + } + } + if matched { + matched_dex_option = Some(dex); + break; + } + } + let matched_dex = match matched_dex_option { + Some(matched_dex) => matched_dex, + None => return Ok(None), + }; + let payload = build_notification_payload(notification); + let slot = + extract_slot_from_result(notification.method.as_str(), ¬ification.params.result); + let signal_kind = format!("signal.pool_candidate.{}", matched_dex.code); + let input = crate::KbDetectionPoolCandidateInput::new( + pubkey, + owner, + crate::KbObservationSourceKind::WsRpc, + endpoint_name, + slot, + "ws.program_notification".to_string(), + payload.clone(), + signal_kind, + crate::KbAnalysisSignalSeverity::Medium, + None, + Some(payload), + ); + let result = self.persistence.register_pool_candidate(&input).await; + match result { + Ok(result) => Ok(Some(result)), + Err(error) => Err(error), + } + } } /// Maps one WebSocket notification method to an observation kind. @@ -247,6 +340,7 @@ fn build_object_key( if let Some(slot) = slot_option { return format!("slot:{slot}"); } + format!("subscription:{subscription}") } @@ -682,6 +776,31 @@ mod tests { } } + fn build_program_pool_candidate_notification() -> crate::KbJsonRpcWsNotification { + crate::KbJsonRpcWsNotification { + jsonrpc: "2.0".to_string(), + method: "programNotification".to_string(), + params: crate::KbJsonRpcWsNotificationParams { + result: serde_json::json!({ + "context": { + "slot": 666666_u64 + }, + "value": { + "pubkey": "Pool111111111111111111111111111111111111111", + "account": { + "owner": "DexProgram111111111111111111111111111111111", + "data": [ + "AQID", + "base64" + ] + } + } + }), + subscription: 5555_u64, + }, + } + } + fn build_logs_notification() -> crate::KbJsonRpcWsNotification { crate::KbJsonRpcWsNotification { jsonrpc: "2.0".to_string(), @@ -899,4 +1018,61 @@ mod tests { "signal.signature_notification.confirmed" ); } + + #[tokio::test] + async fn program_pool_candidate_notification_registers_pool_candidate() { + let database = create_database().await; + let persistence = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database)); + let dex_id = crate::upsert_dex( + persistence.database().as_ref(), + &crate::KbDexDto::new( + "raydium".to_string(), + "Raydium".to_string(), + Some("DexProgram111111111111111111111111111111111".to_string()), + None, + true, + ), + ) + .await + .expect("upsert dex must succeed"); + assert!(dex_id > 0); + let detector = crate::KbSolanaWsDetectionService::new(persistence); + let outcome_result = detector + .process_notification( + Some("helius_primary_ws_programs".to_string()), + &build_program_pool_candidate_notification(), + ) + .await; + let outcome = match outcome_result { + Ok(outcome) => outcome, + Err(error) => panic!("process_notification failed: {error}"), + }; + let pool_id = match outcome { + crate::KbSolanaWsDetectionOutcome::PoolCandidateRegistered { result } => { + assert!(result.dex_id > 0); + assert!(result.pool_id > 0); + assert!(result.pool_listing_id > 0); + result.pool_id + } + _ => panic!("unexpected detection outcome"), + }; + let pool_result = crate::get_pool_by_address( + detector.persistence().database().as_ref(), + "Pool111111111111111111111111111111111111111", + ) + .await; + let pool_option = match pool_result { + Ok(pool_option) => pool_option, + Err(error) => panic!("get_pool_by_address failed: {error}"), + }; + assert!(pool_option.is_some()); + let listing_result = + crate::get_pool_listing_by_pool_id(detector.persistence().database().as_ref(), pool_id) + .await; + let listing_option = match listing_result { + Ok(listing_option) => listing_option, + Err(error) => panic!("get_pool_listing_by_pool_id failed: {error}"), + }; + assert!(listing_option.is_some()); + } } diff --git a/kb_lib/src/detect/types.rs b/kb_lib/src/detect/types.rs index 612c57a..52c7df2 100644 --- a/kb_lib/src/detect/types.rs +++ b/kb_lib/src/detect/types.rs @@ -115,6 +115,7 @@ pub struct KbDetectionTokenCandidateInput { impl KbDetectionTokenCandidateInput { /// Creates a new token candidate input. + #[allow(clippy::too_many_arguments)] pub fn new( mint: std::string::String, symbol: std::option::Option, @@ -162,3 +163,77 @@ pub struct KbDetectionTokenCandidateResult { /// Persisted signal id. pub signal_id: i64, } + +/// One pool candidate detected from a technical source. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbDetectionPoolCandidateInput { + /// Pool address. + pub pool_address: std::string::String, + /// DEX program id or router program id that matched. + pub dex_program_id: std::string::String, + /// Observation source family. + pub source_kind: crate::KbObservationSourceKind, + /// Optional source endpoint logical name. + pub endpoint_name: std::option::Option, + /// Optional slot number. + pub slot: std::option::Option, + /// Observation kind. + pub observation_kind: std::string::String, + /// Observation payload. + pub observation_payload: serde_json::Value, + /// Signal kind. + pub signal_kind: std::string::String, + /// Signal severity. + pub signal_severity: crate::KbAnalysisSignalSeverity, + /// Optional signal score. + pub signal_score: std::option::Option, + /// Optional dedicated signal payload. + pub signal_payload: std::option::Option, +} + +impl KbDetectionPoolCandidateInput { + /// Creates a new pool candidate input. + #[allow(clippy::too_many_arguments)] + pub fn new( + pool_address: std::string::String, + dex_program_id: std::string::String, + source_kind: crate::KbObservationSourceKind, + endpoint_name: std::option::Option, + slot: std::option::Option, + observation_kind: std::string::String, + observation_payload: serde_json::Value, + signal_kind: std::string::String, + signal_severity: crate::KbAnalysisSignalSeverity, + signal_score: std::option::Option, + signal_payload: std::option::Option, + ) -> Self { + Self { + pool_address, + dex_program_id, + source_kind, + endpoint_name, + slot, + observation_kind, + observation_payload, + signal_kind, + signal_severity, + signal_score, + signal_payload, + } + } +} + +/// Result of one pool candidate persistence operation. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbDetectionPoolCandidateResult { + /// Persisted dex id. + pub dex_id: i64, + /// Persisted pool id. + pub pool_id: i64, + /// Persisted pool listing id. + pub pool_listing_id: i64, + /// Persisted observation id. + pub observation_id: i64, + /// Persisted signal id. + pub signal_id: i64, +} diff --git a/kb_lib/src/detect/ws_relay.rs b/kb_lib/src/detect/ws_relay.rs index 7c907e2..fcffa81 100644 --- a/kb_lib/src/detect/ws_relay.rs +++ b/kb_lib/src/detect/ws_relay.rs @@ -38,6 +38,8 @@ pub struct KbWsDetectionRelayStats { pub observation_count: u64, /// Number of registered token candidates. pub token_candidate_count: u64, + /// Number of registered pool candidates. + pub pool_candidate_count: u64, /// Number of processing errors. pub error_count: u64, } @@ -113,6 +115,9 @@ impl KbWsDetectionRelay { crate::KbSolanaWsDetectionOutcome::TokenCandidateRegistered { .. } => { stats.token_candidate_count += 1; } + crate::KbSolanaWsDetectionOutcome::PoolCandidateRegistered { .. } => { + stats.pool_candidate_count += 1; + } } } stats diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index 79d47a6..d9db675 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -162,3 +162,5 @@ pub use crate::detect::KbSolanaWsDetectionService; pub use crate::detect::KbWsDetectionNotificationEnvelope; pub use crate::detect::KbWsDetectionRelay; pub use crate::detect::KbWsDetectionRelayStats; +pub use crate::detect::KbDetectionPoolCandidateInput; +pub use crate::detect::KbDetectionPoolCandidateResult;