This commit is contained in:
2026-04-19 00:07:08 +02:00
parent ed490a96f6
commit bc089c1a10
6 changed files with 387 additions and 89 deletions

View File

@@ -28,6 +28,8 @@ mod account_enrichment;
mod enriched_classifier;
mod signal_correlation;
mod candidate;
mod session_candidate;
mod session_tracker;
/// Runs the listener application bootstrap workflow.
pub use crate::app::run_listener_app;
@@ -145,3 +147,11 @@ pub use crate::candidate::KhbbTokenAccountCandidate;
pub use crate::candidate::KhbbMintCandidate;
/// Candidate bootstrap flow.
pub use crate::candidate::KhbbBootstrapFlowCandidate;
/// Candidate confidence level.
pub use crate::session_candidate::KhbbCandidateConfidence;
/// Candidate tracked during the current listener session.
pub use crate::session_candidate::KhbbSessionCandidate;
/// In-memory tracker for candidates observed during a single listener session.
pub use crate::session_tracker::KhbbSessionCandidateTracker;
/// Result of inserting or updating a session candidate.
pub use crate::session_tracker::KhbbSessionCandidateUpdate;

View File

@@ -40,6 +40,7 @@ pub async fn run_listener_runtime(
let mut interval = tokio::time::interval(tick_duration);
let mut tick_count: u64 = 0;
let mut final_status = std::string::String::from("stopped");
let mut session_candidate_tracker = crate::KhbbSessionCandidateTracker::new();
let http_client_config =
crate::KhbbSolanaHttpRpcClientConfig { url: config.solana_http_rpc_url.clone() };
let http_client_result = crate::KhbbSolanaHttpRpcClient::new(http_client_config);
@@ -693,39 +694,25 @@ pub async fn run_listener_runtime(
lamports = ?correlated.lamports,
"correlated confirmed token account update signal"
);
let candidate_result =
crate::candidate::build_candidates_from_correlated_signal(
let tracker_update =
session_candidate_tracker.upsert_from_correlated_signal(
&crate::KhbbCorrelatedSignal::ConfirmedTokenAccountUpdate(
correlated.clone(),
),
);
match candidate_result {
Ok(candidates) => {
for candidate in candidates {
match candidate {
crate::KhbbCandidate::TokenAccount(inner) => {
tracing::trace!(
listener_session_id = session.id,
pubkey = %inner.pubkey,
context_slot = inner.context_slot,
owner = ?inner.owner,
lamports = ?inner.lamports,
"token account candidate created"
);
}
crate::KhbbCandidate::Mint(_) => {}
crate::KhbbCandidate::BootstrapFlow(_) => {}
}
}
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to build token account candidates from correlated signal"
);
}
}
tracing::trace!(
listener_session_id = session.id,
is_new = tracker_update.is_new,
key = %tracker_update.candidate.key,
category = %tracker_update.candidate.category,
pubkey = ?tracker_update.candidate.pubkey,
first_seen_slot = tracker_update.candidate.first_seen_slot,
last_seen_slot = tracker_update.candidate.last_seen_slot,
seen_count = tracker_update.candidate.seen_count,
score = tracker_update.candidate.score,
confidence = ?tracker_update.candidate.confidence,
"token account session candidate upserted"
);
}
crate::KhbbCorrelatedSignal::PotentialTokenBootstrapFlow(correlated) => {
tracing::trace!(
@@ -781,39 +768,25 @@ pub async fn run_listener_runtime(
lamports = ?correlated.lamports,
"correlated potential new token mint signal"
);
let candidate_result =
crate::candidate::build_candidates_from_correlated_signal(
let tracker_update =
session_candidate_tracker.upsert_from_correlated_signal(
&crate::KhbbCorrelatedSignal::PotentialNewTokenMint(
correlated.clone(),
),
);
match candidate_result {
Ok(candidates) => {
for candidate in candidates {
match candidate {
crate::KhbbCandidate::Mint(inner) => {
tracing::trace!(
listener_session_id = session.id,
pubkey = %inner.pubkey,
context_slot = inner.context_slot,
owner = ?inner.owner,
lamports = ?inner.lamports,
"mint candidate created"
);
}
crate::KhbbCandidate::TokenAccount(_) => {}
crate::KhbbCandidate::BootstrapFlow(_) => {}
}
}
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to build mint candidates from correlated signal"
);
}
}
tracing::trace!(
listener_session_id = session.id,
is_new = tracker_update.is_new,
key = %tracker_update.candidate.key,
category = %tracker_update.candidate.category,
pubkey = ?tracker_update.candidate.pubkey,
first_seen_slot = tracker_update.candidate.first_seen_slot,
last_seen_slot = tracker_update.candidate.last_seen_slot,
seen_count = tracker_update.candidate.seen_count,
score = tracker_update.candidate.score,
confidence = ?tracker_update.candidate.confidence,
"mint session candidate upserted"
);
}
crate::KhbbCorrelatedSignal::ConfirmedTokenAccountUpdate(_) => {}
crate::KhbbCorrelatedSignal::PotentialTokenBootstrapFlow(_) => {}
@@ -860,40 +833,25 @@ pub async fn run_listener_runtime(
saw_bootstrap_logs = correlated.saw_bootstrap_logs,
"correlated potential token bootstrap flow signal"
);
let candidate_result =
crate::candidate::build_candidates_from_correlated_signal(
let tracker_update =
session_candidate_tracker.upsert_from_correlated_signal(
&crate::KhbbCorrelatedSignal::PotentialTokenBootstrapFlow(
correlated.clone(),
),
);
match candidate_result {
Ok(candidates) => {
for candidate in candidates {
match candidate {
crate::KhbbCandidate::BootstrapFlow(inner) => {
tracing::trace!(
listener_session_id = session.id,
pubkey = ?inner.pubkey,
context_slot = inner.context_slot,
saw_token_program = inner.saw_token_program,
saw_associated_token_account = inner.saw_associated_token_account,
saw_bootstrap_logs = inner.saw_bootstrap_logs,
"bootstrap flow candidate created"
);
}
crate::KhbbCandidate::TokenAccount(_) => {}
crate::KhbbCandidate::Mint(_) => {}
}
}
}
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to build bootstrap flow candidates from correlated signal"
);
}
}
tracing::trace!(
listener_session_id = session.id,
is_new = tracker_update.is_new,
key = %tracker_update.candidate.key,
category = %tracker_update.candidate.category,
pubkey = ?tracker_update.candidate.pubkey,
first_seen_slot = tracker_update.candidate.first_seen_slot,
last_seen_slot = tracker_update.candidate.last_seen_slot,
seen_count = tracker_update.candidate.seen_count,
score = tracker_update.candidate.score,
confidence = ?tracker_update.candidate.confidence,
"bootstrap flow session candidate upserted"
);
}
crate::KhbbCorrelatedSignal::ConfirmedTokenAccountUpdate(_) => {}
crate::KhbbCorrelatedSignal::PotentialNewTokenMint(_) => {}

View File

@@ -0,0 +1,35 @@
// file: khbb_lib/src/session_candidate.rs
//! Short-lived in-memory candidate tracking for a single listener session.
/// Candidate confidence level.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum KhbbCandidateConfidence {
/// Weak confidence.
Low,
/// Medium confidence.
Medium,
/// Strong confidence.
High,
}
/// Candidate tracked during the current listener session.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KhbbSessionCandidate {
/// Stable local key used for deduplication.
pub key: std::string::String,
/// Candidate category.
pub category: std::string::String,
/// Optional pubkey attached to the candidate.
pub pubkey: std::option::Option<std::string::String>,
/// First slot at which the candidate was seen.
pub first_seen_slot: u64,
/// Most recent slot at which the candidate was seen.
pub last_seen_slot: u64,
/// Number of times this candidate was observed during the session.
pub seen_count: u64,
/// Lightweight numeric score.
pub score: u64,
/// Confidence level derived from the score.
pub confidence: KhbbCandidateConfidence,
}

View File

@@ -0,0 +1,211 @@
// file: khbb_lib/src/session_tracker.rs
//! In-memory session tracking for correlated candidates.
/// In-memory tracker for candidates observed during a single listener session.
#[derive(Debug, Default)]
pub struct KhbbSessionCandidateTracker {
candidates: std::collections::BTreeMap<std::string::String, crate::KhbbSessionCandidate>,
}
/// Result of inserting or updating a session candidate.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KhbbSessionCandidateUpdate {
/// Whether the candidate was newly inserted.
pub is_new: bool,
/// Current candidate snapshot after the update.
pub candidate: crate::KhbbSessionCandidate,
}
impl KhbbSessionCandidateTracker {
/// Creates a new empty tracker.
pub fn new() -> Self {
Self {
candidates: std::collections::BTreeMap::new(),
}
}
/// Upserts a candidate built from a correlated signal.
pub fn upsert_from_correlated_signal(
&mut self,
signal: &crate::KhbbCorrelatedSignal,
) -> crate::KhbbSessionCandidateUpdate {
let candidate_blueprint = build_session_candidate_from_correlated_signal(signal);
let existing_option = self.candidates.get_mut(&candidate_blueprint.key);
match existing_option {
Some(existing) => {
existing.last_seen_slot = candidate_blueprint.last_seen_slot;
existing.seen_count = existing.seen_count.saturating_add(1);
existing.score =
compute_candidate_score(existing.category.as_str(), existing.seen_count);
existing.confidence = compute_candidate_confidence(existing.score);
crate::KhbbSessionCandidateUpdate {
is_new: false,
candidate: existing.clone(),
}
},
None => {
self.candidates
.insert(candidate_blueprint.key.clone(), candidate_blueprint.clone());
crate::KhbbSessionCandidateUpdate {
is_new: true,
candidate: candidate_blueprint,
}
},
}
}
}
fn build_session_candidate_from_correlated_signal(
signal: &crate::KhbbCorrelatedSignal,
) -> crate::KhbbSessionCandidate {
match signal {
crate::KhbbCorrelatedSignal::ConfirmedTokenAccountUpdate(inner) => {
let key = std::format!("token_account:{}", inner.pubkey);
let category = std::string::String::from("token_account");
let seen_count = 1u64;
let score = compute_candidate_score(category.as_str(), seen_count);
let confidence = compute_candidate_confidence(score);
crate::KhbbSessionCandidate {
key,
category,
pubkey: Some(inner.pubkey.clone()),
first_seen_slot: inner.context_slot,
last_seen_slot: inner.context_slot,
seen_count,
score,
confidence,
}
},
crate::KhbbCorrelatedSignal::PotentialNewTokenMint(inner) => {
let key = std::format!("mint:{}", inner.pubkey);
let category = std::string::String::from("mint");
let seen_count = 1u64;
let score = compute_candidate_score(category.as_str(), seen_count);
let confidence = compute_candidate_confidence(score);
crate::KhbbSessionCandidate {
key,
category,
pubkey: Some(inner.pubkey.clone()),
first_seen_slot: inner.context_slot,
last_seen_slot: inner.context_slot,
seen_count,
score,
confidence,
}
},
crate::KhbbCorrelatedSignal::PotentialTokenBootstrapFlow(inner) => {
let key = match &inner.pubkey {
Some(value) => std::format!("bootstrap:{}", value),
None => std::format!("bootstrap:slot:{}", inner.context_slot),
};
let category = std::string::String::from("bootstrap_flow");
let seen_count = 1u64;
let score = compute_candidate_score(category.as_str(), seen_count);
let confidence = compute_candidate_confidence(score);
crate::KhbbSessionCandidate {
key,
category,
pubkey: inner.pubkey.clone(),
first_seen_slot: inner.context_slot,
last_seen_slot: inner.context_slot,
seen_count,
score,
confidence,
}
},
}
}
fn compute_candidate_score(category: &str, seen_count: u64) -> u64 {
let base_score = match category {
"mint" => 80u64,
"bootstrap_flow" => 60u64,
"token_account" => 40u64,
_ => 20u64,
};
base_score.saturating_add(seen_count.saturating_sub(1).saturating_mul(10))
}
fn compute_candidate_confidence(score: u64) -> crate::KhbbCandidateConfidence {
if score >= 80 {
return crate::KhbbCandidateConfidence::High;
}
if score >= 50 {
return crate::KhbbCandidateConfidence::Medium;
}
crate::KhbbCandidateConfidence::Low
}
#[cfg(test)]
mod tests {
#[test]
fn upsert_from_correlated_signal_inserts_new_token_account_candidate() {
let mut tracker = super::KhbbSessionCandidateTracker::new();
let signal = crate::KhbbCorrelatedSignal::ConfirmedTokenAccountUpdate(
crate::KhbbConfirmedTokenAccountUpdateSignal {
pubkey: std::string::String::from("SomePubkey"),
context_slot: 100,
owner: Some(crate::ids::SPL_TOKEN_PROGRAM_ID.to_string()),
lamports: Some(123),
},
);
let update = tracker.upsert_from_correlated_signal(&signal);
assert!(update.is_new);
assert_eq!(update.candidate.key, "token_account:SomePubkey");
assert_eq!(update.candidate.seen_count, 1);
assert_eq!(update.candidate.confidence, crate::KhbbCandidateConfidence::Low);
}
#[test]
fn upsert_from_correlated_signal_updates_existing_candidate() {
let mut tracker = super::KhbbSessionCandidateTracker::new();
let signal = crate::KhbbCorrelatedSignal::ConfirmedTokenAccountUpdate(
crate::KhbbConfirmedTokenAccountUpdateSignal {
pubkey: std::string::String::from("SomePubkey"),
context_slot: 100,
owner: Some(crate::ids::SPL_TOKEN_PROGRAM_ID.to_string()),
lamports: Some(123),
},
);
let first = tracker.upsert_from_correlated_signal(&signal);
assert!(first.is_new);
let second = tracker.upsert_from_correlated_signal(&signal);
assert!(!second.is_new);
assert_eq!(second.candidate.seen_count, 2);
assert_eq!(second.candidate.score, 50);
assert_eq!(second.candidate.confidence, crate::KhbbCandidateConfidence::Medium);
}
#[test]
fn mint_candidate_starts_with_high_confidence() {
let mut tracker = super::KhbbSessionCandidateTracker::new();
let signal = crate::KhbbCorrelatedSignal::PotentialNewTokenMint(
crate::KhbbPotentialNewTokenMintSignal {
pubkey: std::string::String::from("MintPubkey"),
context_slot: 200,
owner: Some(crate::ids::SPL_TOKEN_PROGRAM_ID.to_string()),
lamports: Some(456),
},
);
let update = tracker.upsert_from_correlated_signal(&signal);
assert!(update.is_new);
assert_eq!(update.candidate.confidence, crate::KhbbCandidateConfidence::High);
}
#[test]
fn bootstrap_candidate_starts_with_medium_confidence() {
let mut tracker = super::KhbbSessionCandidateTracker::new();
let signal = crate::KhbbCorrelatedSignal::PotentialTokenBootstrapFlow(
crate::KhbbPotentialTokenBootstrapFlowSignal {
pubkey: Some(std::string::String::from("FlowPubkey")),
context_slot: 300,
saw_token_program: true,
saw_associated_token_account: true,
saw_bootstrap_logs: false,
},
);
let update = tracker.upsert_from_correlated_signal(&signal);
assert!(update.is_new);
assert_eq!(update.candidate.confidence, crate::KhbbCandidateConfidence::Medium);
}
}