diff --git a/Cargo.toml b/Cargo.toml index 00d0ad1..ec728df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.5.6" +version = "0.5.7" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot" diff --git a/khbb_lib/src/app.rs b/khbb_lib/src/app.rs index 992316e..91c7347 100644 --- a/khbb_lib/src/app.rs +++ b/khbb_lib/src/app.rs @@ -24,6 +24,7 @@ pub async fn run_listener_app(config_path: &str) -> core::result::Result<(), cra solana_ws_rpc_url = %config.solana_ws_rpc_url, yellowstone_grpc_url = ?config.yellowstone_grpc_url, bootstrap_database = config.bootstrap_database, + listener_max_ticks = config.listener_max_ticks, listener_poll_interval_ms = config.listener_poll_interval_ms, enable_ws_slot_subscribe = config.enable_ws_slot_subscribe, enable_ws_logs_subscribe = config.enable_ws_logs_subscribe, diff --git a/khbb_lib/src/candidate.rs b/khbb_lib/src/candidate.rs new file mode 100644 index 0000000..40e59de --- /dev/null +++ b/khbb_lib/src/candidate.rs @@ -0,0 +1,164 @@ +// file: khbb_lib/src/candidate.rs + +//! Candidate domain objects derived from correlated signals. + +/// Candidate object derived from a correlated signal. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum KhbbCandidate { + /// Candidate token account. + TokenAccount(KhbbTokenAccountCandidate), + /// Candidate mint account. + Mint(KhbbMintCandidate), + /// Candidate bootstrap flow. + BootstrapFlow(KhbbBootstrapFlowCandidate), +} + +/// Candidate token account. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KhbbTokenAccountCandidate { + /// Candidate pubkey. + pub pubkey: std::string::String, + /// Context slot. + pub context_slot: u64, + /// Owner program if available. + pub owner: std::option::Option, + /// Lamports if available. + pub lamports: std::option::Option, +} + +/// Candidate mint account. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KhbbMintCandidate { + /// Candidate pubkey. + pub pubkey: std::string::String, + /// Context slot. + pub context_slot: u64, + /// Owner program if available. + pub owner: std::option::Option, + /// Lamports if available. + pub lamports: std::option::Option, +} + +/// Candidate bootstrap flow. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KhbbBootstrapFlowCandidate { + /// Optional pubkey attached to the flow. + pub pubkey: std::option::Option, + /// Context slot. + pub context_slot: u64, + /// Whether token program activity was seen. + pub saw_token_program: bool, + /// Whether associated token account activity was seen. + pub saw_associated_token_account: bool, + /// Whether bootstrap-style logs were seen. + pub saw_bootstrap_logs: bool, +} + +/// Builds candidate objects from correlated signals. +pub(crate) fn build_candidates_from_correlated_signal( + signal: &crate::KhbbCorrelatedSignal, +) -> core::result::Result, crate::KhbbError> { + match signal { + crate::KhbbCorrelatedSignal::ConfirmedTokenAccountUpdate(inner) => { + Ok(vec![KhbbCandidate::TokenAccount(KhbbTokenAccountCandidate { + pubkey: inner.pubkey.clone(), + context_slot: inner.context_slot, + owner: inner.owner.clone(), + lamports: inner.lamports, + })]) + }, + crate::KhbbCorrelatedSignal::PotentialNewTokenMint(inner) => { + Ok(vec![KhbbCandidate::Mint(KhbbMintCandidate { + pubkey: inner.pubkey.clone(), + context_slot: inner.context_slot, + owner: inner.owner.clone(), + lamports: inner.lamports, + })]) + }, + crate::KhbbCorrelatedSignal::PotentialTokenBootstrapFlow(inner) => { + Ok(vec![KhbbCandidate::BootstrapFlow(KhbbBootstrapFlowCandidate { + pubkey: inner.pubkey.clone(), + context_slot: inner.context_slot, + saw_token_program: inner.saw_token_program, + saw_associated_token_account: inner.saw_associated_token_account, + saw_bootstrap_logs: inner.saw_bootstrap_logs, + })]) + }, + } +} + +#[cfg(test)] +mod tests { + #[test] + fn build_candidate_from_confirmed_token_account_update() { + let signal = crate::KhbbCorrelatedSignal::ConfirmedTokenAccountUpdate( + crate::KhbbConfirmedTokenAccountUpdateSignal { + pubkey: std::string::String::from("SomePubkey"), + context_slot: 100, + owner: Some(crate::ids::SPL_TOKEN_PROGRAM_ID.to_string()), + lamports: Some(123), + }, + ); + let result = super::build_candidates_from_correlated_signal(&signal); + assert!(result.is_ok()); + let candidates = result.expect("build token account candidate"); + assert_eq!(candidates.len(), 1); + match &candidates[0] { + super::KhbbCandidate::TokenAccount(inner) => { + assert_eq!(inner.pubkey, "SomePubkey"); + }, + _ => { + panic!("expected token account candidate"); + }, + } + } + + #[test] + fn build_candidate_from_potential_new_token_mint() { + let signal = crate::KhbbCorrelatedSignal::PotentialNewTokenMint( + crate::KhbbPotentialNewTokenMintSignal { + pubkey: std::string::String::from("MintPubkey"), + context_slot: 200, + owner: Some(crate::ids::SPL_TOKEN_PROGRAM_ID.to_string()), + lamports: Some(456), + }, + ); + let result = super::build_candidates_from_correlated_signal(&signal); + assert!(result.is_ok()); + let candidates = result.expect("build mint candidate"); + assert_eq!(candidates.len(), 1); + match &candidates[0] { + super::KhbbCandidate::Mint(inner) => { + assert_eq!(inner.pubkey, "MintPubkey"); + }, + _ => { + panic!("expected mint candidate"); + }, + } + } + + #[test] + fn build_candidate_from_bootstrap_flow() { + let signal = crate::KhbbCorrelatedSignal::PotentialTokenBootstrapFlow( + crate::KhbbPotentialTokenBootstrapFlowSignal { + pubkey: Some(std::string::String::from("FlowPubkey")), + context_slot: 300, + saw_token_program: true, + saw_associated_token_account: true, + saw_bootstrap_logs: true, + }, + ); + let result = super::build_candidates_from_correlated_signal(&signal); + assert!(result.is_ok()); + let candidates = result.expect("build bootstrap candidate"); + assert_eq!(candidates.len(), 1); + match &candidates[0] { + super::KhbbCandidate::BootstrapFlow(inner) => { + assert_eq!(inner.pubkey.as_deref(), Some("FlowPubkey")); + }, + _ => { + panic!("expected bootstrap flow candidate"); + }, + } + } +} diff --git a/khbb_lib/src/config.rs b/khbb_lib/src/config.rs index 0d08408..a5a4312 100644 --- a/khbb_lib/src/config.rs +++ b/khbb_lib/src/config.rs @@ -17,6 +17,8 @@ pub struct KhbbAppConfig { pub log_filter: std::string::String, /// Enables or disables database schema bootstrap at startup. pub bootstrap_database: bool, + /// Maximum number of listener loop ticks before a controlled shutdown. + pub listener_max_ticks: u64, /// Polling interval used by the current runtime skeleton. pub listener_poll_interval_ms: u64, /// Enables or disables `slotSubscribe` during listener startup. @@ -81,6 +83,11 @@ impl KhbbAppConfig { message: std::string::String::from("log_filter must not be empty"), }); } + if self.listener_max_ticks == 0 { + return Err(crate::KhbbError::Config { + message: std::string::String::from("listener_max_ticks must be greater than 0"), + }); + } if self.listener_poll_interval_ms == 0 { return Err(crate::KhbbError::Config { message: std::string::String::from( @@ -126,6 +133,7 @@ mod tests { )), log_filter: std::string::String::from("info"), bootstrap_database: true, + listener_max_ticks: 3, listener_poll_interval_ms: 1000, enable_ws_slot_subscribe: true, enable_ws_logs_subscribe: true, @@ -173,6 +181,14 @@ mod tests { assert!(result.is_err()); } + #[test] + fn validate_rejects_zero_listener_max_ticks() { + let mut config = build_valid_config(); + config.listener_max_ticks = 0; + let result = config.validate(); + assert!(result.is_err()); + } + #[test] fn validate_rejects_zero_poll_interval() { let mut config = build_valid_config(); @@ -194,6 +210,7 @@ mod tests { "yellowstone_grpc_url": "https://mainnet.helius-rpc.com:443", "log_filter": "info", "bootstrap_database": true, + "listener_max_ticks": 15, "listener_poll_interval_ms": 1000, "enable_ws_slot_subscribe": true, "enable_ws_logs_subscribe": true, diff --git a/khbb_lib/src/lib.rs b/khbb_lib/src/lib.rs index 8019d81..f2d8b40 100644 --- a/khbb_lib/src/lib.rs +++ b/khbb_lib/src/lib.rs @@ -27,6 +27,7 @@ mod heuristics; mod account_enrichment; mod enriched_classifier; mod signal_correlation; +mod candidate; /// Runs the listener application bootstrap workflow. pub use crate::app::run_listener_app; @@ -136,3 +137,11 @@ pub use crate::signal_correlation::KhbbConfirmedTokenAccountUpdateSignal; pub use crate::signal_correlation::KhbbPotentialNewTokenMintSignal; /// Correlated signal indicating a potential token bootstrap flow. pub use crate::signal_correlation::KhbbPotentialTokenBootstrapFlowSignal; +/// Candidate object derived from a correlated signal. +pub use crate::candidate::KhbbCandidate; +/// Candidate token account. +pub use crate::candidate::KhbbTokenAccountCandidate; +/// Candidate mint account. +pub use crate::candidate::KhbbMintCandidate; +/// Candidate bootstrap flow. +pub use crate::candidate::KhbbBootstrapFlowCandidate; diff --git a/khbb_lib/src/listener.rs b/khbb_lib/src/listener.rs index 24ee1ce..09b9354 100644 --- a/khbb_lib/src/listener.rs +++ b/khbb_lib/src/listener.rs @@ -693,6 +693,39 @@ pub async fn run_listener_runtime( lamports = ?correlated.lamports, "correlated confirmed token account update signal" ); + let candidate_result = + crate::candidate::build_candidates_from_correlated_signal( + &crate::KhbbCorrelatedSignal::ConfirmedTokenAccountUpdate( + correlated.clone(), + ), + ); + match candidate_result { + Ok(candidates) => { + for candidate in candidates { + match candidate { + crate::KhbbCandidate::TokenAccount(inner) => { + tracing::trace!( + listener_session_id = session.id, + pubkey = %inner.pubkey, + context_slot = inner.context_slot, + owner = ?inner.owner, + lamports = ?inner.lamports, + "token account candidate created" + ); + } + crate::KhbbCandidate::Mint(_) => {} + crate::KhbbCandidate::BootstrapFlow(_) => {} + } + } + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to build token account candidates from correlated signal" + ); + } + } } crate::KhbbCorrelatedSignal::PotentialTokenBootstrapFlow(correlated) => { tracing::trace!( @@ -748,6 +781,39 @@ pub async fn run_listener_runtime( lamports = ?correlated.lamports, "correlated potential new token mint signal" ); + let candidate_result = + crate::candidate::build_candidates_from_correlated_signal( + &crate::KhbbCorrelatedSignal::PotentialNewTokenMint( + correlated.clone(), + ), + ); + match candidate_result { + Ok(candidates) => { + for candidate in candidates { + match candidate { + crate::KhbbCandidate::Mint(inner) => { + tracing::trace!( + listener_session_id = session.id, + pubkey = %inner.pubkey, + context_slot = inner.context_slot, + owner = ?inner.owner, + lamports = ?inner.lamports, + "mint candidate created" + ); + } + crate::KhbbCandidate::TokenAccount(_) => {} + crate::KhbbCandidate::BootstrapFlow(_) => {} + } + } + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to build mint candidates from correlated signal" + ); + } + } } crate::KhbbCorrelatedSignal::ConfirmedTokenAccountUpdate(_) => {} crate::KhbbCorrelatedSignal::PotentialTokenBootstrapFlow(_) => {} @@ -794,6 +860,40 @@ pub async fn run_listener_runtime( saw_bootstrap_logs = correlated.saw_bootstrap_logs, "correlated potential token bootstrap flow signal" ); + let candidate_result = + crate::candidate::build_candidates_from_correlated_signal( + &crate::KhbbCorrelatedSignal::PotentialTokenBootstrapFlow( + correlated.clone(), + ), + ); + match candidate_result { + Ok(candidates) => { + for candidate in candidates { + match candidate { + crate::KhbbCandidate::BootstrapFlow(inner) => { + tracing::trace!( + listener_session_id = session.id, + pubkey = ?inner.pubkey, + context_slot = inner.context_slot, + saw_token_program = inner.saw_token_program, + saw_associated_token_account = inner.saw_associated_token_account, + saw_bootstrap_logs = inner.saw_bootstrap_logs, + "bootstrap flow candidate created" + ); + } + crate::KhbbCandidate::TokenAccount(_) => {} + crate::KhbbCandidate::Mint(_) => {} + } + } + } + Err(error) => { + tracing::error!( + listener_session_id = session.id, + error = %error, + "failed to build bootstrap flow candidates from correlated signal" + ); + } + } } crate::KhbbCorrelatedSignal::ConfirmedTokenAccountUpdate(_) => {} crate::KhbbCorrelatedSignal::PotentialNewTokenMint(_) => {} @@ -1127,7 +1227,7 @@ pub async fn run_listener_runtime( } Err(_) => {} } - if tick_count >= 3 { + if tick_count >= config.listener_max_ticks { break; } } diff --git a/khbb_lib/src/storage.rs b/khbb_lib/src/storage.rs index c729720..f8a5dd9 100644 --- a/khbb_lib/src/storage.rs +++ b/khbb_lib/src/storage.rs @@ -373,6 +373,7 @@ mod tests { )), log_filter: std::string::String::from("info"), bootstrap_database: true, + listener_max_ticks: 3, listener_poll_interval_ms: 1000, enable_ws_slot_subscribe: true, enable_ws_logs_subscribe: true, @@ -500,6 +501,7 @@ WHERE id = ?1; yellowstone_grpc_url: None, log_filter: "info".into(), bootstrap_database: false, + listener_max_ticks: 3, listener_poll_interval_ms: 1000, enable_ws_slot_subscribe: true, enable_ws_logs_subscribe: true,