// file: kb_lib/src/detect/solana_ws.rs //! Solana WebSocket detection bridge. //! //! This module converts raw Solana JSON-RPC WebSocket notifications into //! normalized observations and, when possible, token candidates. /// Result of one Solana WebSocket detection pass. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum KbSolanaWsDetectionOutcome { /// The notification is currently ignored by the detector. Ignored, /// A technical observation was stored. ObservationRecorded { /// Persisted observation id. observation_id: i64, }, /// A token candidate was registered. TokenCandidateRegistered { /// Persistence result. result: crate::KbDetectionTokenCandidateResult, }, } /// Detection service for Solana WebSocket notifications. #[derive(Debug, Clone)] pub struct KbSolanaWsDetectionService { /// Shared persistence façade. persistence: crate::KbDetectionPersistenceService, } impl KbSolanaWsDetectionService { /// Creates a new Solana WebSocket detection service. pub fn new(persistence: crate::KbDetectionPersistenceService) -> Self { Self { persistence } } /// Returns the shared persistence façade. pub fn persistence(&self) -> &crate::KbDetectionPersistenceService { &self.persistence } /// Processes one Solana WebSocket JSON-RPC notification. pub async fn process_notification( &self, endpoint_name: std::option::Option, notification: &crate::KbJsonRpcWsNotification, ) -> Result { let observation_kind_option = map_notification_method_to_observation_kind(notification.method.as_str()); let observation_kind = match observation_kind_option { Some(observation_kind) => observation_kind, None => return Ok(crate::KbSolanaWsDetectionOutcome::Ignored), }; let token_candidate_result = self .try_register_token_candidate(endpoint_name.clone(), notification) .await; match token_candidate_result { Ok(Some(result)) => { return Ok(crate::KbSolanaWsDetectionOutcome::TokenCandidateRegistered { result }); } Ok(None) => {} Err(error) => return Err(error), } let payload = build_notification_payload(notification); let object_key = build_object_key( notification.method.as_str(), ¬ification.params.result, notification.params.subscription, ); let slot = extract_slot_from_result(notification.method.as_str(), ¬ification.params.result); let observation_input = crate::KbDetectionObservationInput::new( observation_kind, crate::KbObservationSourceKind::WsRpc, endpoint_name, object_key, slot, payload, ); let observation_id_result = self .persistence .record_observation(&observation_input) .await; let observation_id = match observation_id_result { Ok(observation_id) => observation_id, Err(error) => return Err(error), }; Ok(crate::KbSolanaWsDetectionOutcome::ObservationRecorded { observation_id }) } /// Tries to register a token candidate from one notification. async fn try_register_token_candidate( &self, endpoint_name: std::option::Option, notification: &crate::KbJsonRpcWsNotification, ) -> Result, crate::KbError> { if notification.method.as_str() != "programNotification" { return Ok(None); } let result_value = ¬ification.params.result; let pubkey_option = extract_pubkey_from_result(result_value); let pubkey = match pubkey_option { Some(pubkey) => pubkey, None => return Ok(None), }; let account_value_option = extract_account_value_from_result(result_value); let account_value = match account_value_option { Some(account_value) => account_value, None => return Ok(None), }; let parsed_type_option = extract_parsed_account_type(account_value); let parsed_type = match parsed_type_option { Some(parsed_type) => parsed_type, None => return Ok(None), }; if parsed_type.as_str() != "mint" { return Ok(None); } let token_program_option = extract_account_owner(account_value); let token_program = match token_program_option { Some(token_program) => token_program, None => return Ok(None), }; if token_program != crate::SPL_TOKEN_PROGRAM_ID.to_string() && token_program != crate::SPL_TOKEN_2022_PROGRAM_ID.to_string() { return Ok(None); } let decimals = extract_decimals_from_account_value(account_value); let slot = extract_slot_from_result(notification.method.as_str(), ¬ification.params.result); let payload = build_notification_payload(notification); let is_quote_token = pubkey == crate::WSOL_MINT_ID.to_string(); let input = crate::KbDetectionTokenCandidateInput::new( pubkey, None, None, decimals, token_program, is_quote_token, crate::KbObservationSourceKind::WsRpc, endpoint_name, slot, "ws.program_notification".to_string(), payload.clone(), "signal.token_mint_account_detected".to_string(), crate::KbAnalysisSignalSeverity::Medium, None, Some(payload), ); let result = self.persistence.register_token_candidate(&input).await; match result { Ok(result) => Ok(Some(result)), Err(error) => Err(error), } } } /// Maps one WebSocket notification method to an observation kind. fn map_notification_method_to_observation_kind( method: &str, ) -> std::option::Option { match method { "accountNotification" => Some("ws.account_notification".to_string()), "blockNotification" => Some("ws.block_notification".to_string()), "logsNotification" => Some("ws.logs_notification".to_string()), "programNotification" => Some("ws.program_notification".to_string()), "rootNotification" => Some("ws.root_notification".to_string()), "signatureNotification" => Some("ws.signature_notification".to_string()), "slotNotification" => Some("ws.slot_notification".to_string()), "slotsUpdatesNotification" => Some("ws.slots_updates_notification".to_string()), "voteNotification" => Some("ws.vote_notification".to_string()), _ => None, } } /// Wraps one raw notification into a normalized JSON payload. fn build_notification_payload(notification: &crate::KbJsonRpcWsNotification) -> serde_json::Value { serde_json::json!({ "jsonrpc": notification.jsonrpc, "method": notification.method, "subscription": notification.params.subscription, "result": notification.params.result, }) } /// Builds one logical object key from the notification result. fn build_object_key( method: &str, result: &serde_json::Value, subscription: u64, ) -> std::string::String { let pubkey_option = extract_pubkey_from_result(result); if let Some(pubkey) = pubkey_option { return pubkey; } let signature_option = extract_signature_from_result(result); if let Some(signature) = signature_option { return signature; } let slot_option = extract_slot_from_result(method, result); if let Some(slot) = slot_option { return format!("slot:{slot}"); } format!("subscription:{subscription}") } /// Extracts a slot number from one notification result. fn extract_slot_from_result(method: &str, result: &serde_json::Value) -> std::option::Option { if method == "rootNotification" { if let Some(slot) = result.as_u64() { return Some(slot); } } if let Some(slot) = result.get("slot").and_then(serde_json::Value::as_u64) { return Some(slot); } if let Some(context) = result.get("context") { if let Some(slot) = context.get("slot").and_then(serde_json::Value::as_u64) { return Some(slot); } } if let Some(value) = result.get("value") { if let Some(slot) = value.get("slot").and_then(serde_json::Value::as_u64) { return Some(slot); } } None } /// Extracts a pubkey from one notification result. fn extract_pubkey_from_result( result: &serde_json::Value, ) -> std::option::Option { if let Some(pubkey) = result.get("pubkey").and_then(serde_json::Value::as_str) { return Some(pubkey.to_string()); } if let Some(value) = result.get("value") { if let Some(pubkey) = value.get("pubkey").and_then(serde_json::Value::as_str) { return Some(pubkey.to_string()); } } None } /// Extracts a signature from one notification result. fn extract_signature_from_result( result: &serde_json::Value, ) -> std::option::Option { if let Some(signature) = result.get("signature").and_then(serde_json::Value::as_str) { return Some(signature.to_string()); } if let Some(value) = result.get("value") { if let Some(signature) = value.get("signature").and_then(serde_json::Value::as_str) { return Some(signature.to_string()); } } None } /// Extracts one account-like JSON object from one notification result. fn extract_account_value_from_result<'a>( result: &'a serde_json::Value, ) -> std::option::Option<&'a serde_json::Value> { if let Some(account) = result.get("account") { return Some(account); } if let Some(value) = result.get("value") { if let Some(account) = value.get("account") { return Some(account); } if value.is_object() { return Some(value); } } None } /// Extracts the parsed account type from one account-like JSON object. fn extract_parsed_account_type( account_value: &serde_json::Value, ) -> std::option::Option { let data_option = account_value.get("data"); let data = match data_option { Some(data) => data, None => return None, }; let parsed_option = data.get("parsed"); let parsed = match parsed_option { Some(parsed) => parsed, None => return None, }; let type_option = parsed.get("type").and_then(serde_json::Value::as_str); match type_option { Some(parsed_type) => Some(parsed_type.to_string()), None => None, } } /// Extracts the token program owner from one account-like JSON object. fn extract_account_owner( account_value: &serde_json::Value, ) -> std::option::Option { let owner_option = account_value .get("owner") .and_then(serde_json::Value::as_str); match owner_option { Some(owner) => Some(owner.to_string()), None => None, } } /// Extracts the decimals value from one parsed mint account-like JSON object. fn extract_decimals_from_account_value( account_value: &serde_json::Value, ) -> std::option::Option { let data_option = account_value.get("data"); let data = match data_option { Some(data) => data, None => return None, }; let parsed_option = data.get("parsed"); let parsed = match parsed_option { Some(parsed) => parsed, None => return None, }; let info_option = parsed.get("info"); let info = match info_option { Some(info) => info, None => return None, }; let decimals_option = info.get("decimals").and_then(serde_json::Value::as_u64); let decimals = match decimals_option { Some(decimals) => decimals, None => return None, }; let converted = u8::try_from(decimals); match converted { Ok(decimals) => Some(decimals), Err(_) => None, } } #[cfg(test)] mod tests { async fn create_database() -> crate::KbDatabase { let tempdir = tempfile::tempdir().expect("tempdir must succeed"); let database_path = tempdir.path().join("detect_ws.sqlite3"); let config = crate::KbDatabaseConfig { enabled: true, backend: crate::KbDatabaseBackend::Sqlite, sqlite: crate::KbSqliteDatabaseConfig { path: database_path.to_string_lossy().to_string(), create_if_missing: true, busy_timeout_ms: 5000, max_connections: 1, auto_initialize_schema: true, use_wal: true, }, }; crate::KbDatabase::connect_and_initialize(&config) .await .expect("database init must succeed") } fn build_slot_notification() -> crate::KbJsonRpcWsNotification { crate::KbJsonRpcWsNotification { jsonrpc: "2.0".to_string(), method: "slotNotification".to_string(), params: crate::KbJsonRpcWsNotificationParams { result: serde_json::json!({ "slot": 414726860_u64, "parent": 414726859_u64, "root": 414726828_u64 }), subscription: 1008_u64, }, } } fn build_program_mint_notification() -> crate::KbJsonRpcWsNotification { crate::KbJsonRpcWsNotification { jsonrpc: "2.0".to_string(), method: "programNotification".to_string(), params: crate::KbJsonRpcWsNotificationParams { result: serde_json::json!({ "context": { "slot": 777777_u64 }, "value": { "pubkey": "Mint111111111111111111111111111111111111111", "account": { "owner": crate::SPL_TOKEN_PROGRAM_ID.to_string(), "data": { "program": "spl-token", "parsed": { "type": "mint", "info": { "decimals": 6_u64 } } } } } }), subscription: 2048_u64, }, } } #[tokio::test] async fn slot_notification_records_observation() { let database = create_database().await; let persistence = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database)); let detector = crate::KbSolanaWsDetectionService::new(persistence); let outcome_result = detector .process_notification( Some("mainnet_public_ws_slots".to_string()), &build_slot_notification(), ) .await; let outcome = match outcome_result { Ok(outcome) => outcome, Err(error) => panic!("process_notification failed: {error}"), }; match outcome { crate::KbSolanaWsDetectionOutcome::ObservationRecorded { observation_id } => { assert!(observation_id > 0); } _ => panic!("unexpected detection outcome"), } let observations_result = crate::list_recent_onchain_observations(detector.persistence().database().as_ref(), 10) .await; let observations = match observations_result { Ok(observations) => observations, Err(error) => panic!("list_recent_onchain_observations failed: {error}"), }; assert_eq!(observations.len(), 1); assert_eq!(observations[0].observation_kind, "ws.slot_notification"); assert_eq!(observations[0].object_key, "slot:414726860"); } #[tokio::test] async fn program_mint_notification_registers_token_candidate() { let database = create_database().await; let persistence = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database)); let detector = crate::KbSolanaWsDetectionService::new(persistence); let outcome_result = detector .process_notification( Some("helius_primary_ws_programs".to_string()), &build_program_mint_notification(), ) .await; let outcome = match outcome_result { Ok(outcome) => outcome, Err(error) => panic!("process_notification failed: {error}"), }; match outcome { crate::KbSolanaWsDetectionOutcome::TokenCandidateRegistered { result } => { assert!(result.token_id > 0); assert!(result.observation_id > 0); assert!(result.signal_id > 0); } _ => panic!("unexpected detection outcome"), } let token_result = crate::get_token_by_mint( detector.persistence().database().as_ref(), "Mint111111111111111111111111111111111111111", ) .await; let token_option = match token_result { Ok(token_option) => token_option, Err(error) => panic!("get_token_by_mint failed: {error}"), }; assert!(token_option.is_some()); let observations_result = crate::list_recent_onchain_observations(detector.persistence().database().as_ref(), 10) .await; let observations = match observations_result { Ok(observations) => observations, Err(error) => panic!("list_recent_onchain_observations failed: {error}"), }; let signals_result = crate::list_recent_analysis_signals(detector.persistence().database().as_ref(), 10) .await; let signals = match signals_result { Ok(signals) => signals, Err(error) => panic!("list_recent_analysis_signals failed: {error}"), }; assert_eq!(observations.len(), 1); assert_eq!(signals.len(), 1); assert_eq!( observations[0].object_key, "Mint111111111111111111111111111111111111111" ); assert_eq!( signals[0].object_key, "Mint111111111111111111111111111111111111111" ); } }