This commit is contained in:
2026-04-25 06:29:48 +02:00
parent fbd4d5d6ef
commit 04e09b0c97
7 changed files with 467 additions and 21 deletions

16
kb_lib/src/detect.rs Normal file
View File

@@ -0,0 +1,16 @@
// file: kb_lib/src/detect.rs
//! Detection pipeline facade.
//!
//! This module sits between transport/connectors and persistence.
//! It centralizes how technical observations, analysis signals and
//! candidate tokens are persisted before richer detection logic is added.
mod service;
mod types;
pub use crate::detect::service::KbDetectionPersistenceService;
pub use crate::detect::types::KbDetectionObservationInput;
pub use crate::detect::types::KbDetectionSignalInput;
pub use crate::detect::types::KbDetectionTokenCandidateInput;
pub use crate::detect::types::KbDetectionTokenCandidateResult;

View File

@@ -0,0 +1,250 @@
// file: kb_lib/src/detect/service.rs
//! Detection persistence service.
/// Persistence façade between technical detection and database storage.
#[derive(Debug, Clone)]
pub struct KbDetectionPersistenceService {
/// Shared database handle.
database: std::sync::Arc<crate::KbDatabase>,
}
impl KbDetectionPersistenceService {
/// Creates a new detection persistence service.
pub fn new(database: std::sync::Arc<crate::KbDatabase>) -> Self {
Self { database }
}
/// Returns the shared database handle.
pub fn database(&self) -> &std::sync::Arc<crate::KbDatabase> {
&self.database
}
/// Persists one on-chain observation.
pub async fn record_observation(
&self,
input: &crate::KbDetectionObservationInput,
) -> Result<i64, crate::KbError> {
let dto = crate::KbOnchainObservationDto::new(
input.observation_kind.clone(),
input.source_kind,
input.endpoint_name.clone(),
input.object_key.clone(),
input.slot,
input.payload.clone(),
);
crate::insert_onchain_observation(self.database.as_ref(), &dto).await
}
/// Persists one analysis signal.
pub async fn record_signal(
&self,
input: &crate::KbDetectionSignalInput,
) -> Result<i64, crate::KbError> {
let dto = crate::KbAnalysisSignalDto::new(
input.signal_kind.clone(),
input.severity,
input.object_key.clone(),
input.related_observation_id,
input.score,
input.payload.clone(),
);
crate::insert_analysis_signal(self.database.as_ref(), &dto).await
}
/// Registers one token candidate from a technical source.
///
/// This method:
/// - upserts the normalized token entry,
/// - stores the underlying technical observation,
/// - stores the derived signal linked to that observation.
pub async fn register_token_candidate(
&self,
input: &crate::KbDetectionTokenCandidateInput,
) -> Result<crate::KbDetectionTokenCandidateResult, crate::KbError> {
let token_dto = crate::KbTokenDto::new(
input.mint.clone(),
input.symbol.clone(),
input.name.clone(),
input.decimals,
input.token_program.clone(),
input.is_quote_token,
);
let token_id_result = crate::upsert_token(self.database.as_ref(), &token_dto).await;
let token_id = match token_id_result {
Ok(token_id) => token_id,
Err(error) => return Err(error),
};
let observation_input = crate::KbDetectionObservationInput::new(
input.observation_kind.clone(),
input.source_kind,
input.endpoint_name.clone(),
input.mint.clone(),
input.slot,
input.observation_payload.clone(),
);
let observation_id_result = self.record_observation(&observation_input).await;
let observation_id = match observation_id_result {
Ok(observation_id) => observation_id,
Err(error) => return Err(error),
};
let signal_payload = match &input.signal_payload {
Some(signal_payload) => signal_payload.clone(),
None => input.observation_payload.clone(),
};
let signal_input = crate::KbDetectionSignalInput::new(
input.signal_kind.clone(),
input.signal_severity,
input.mint.clone(),
Some(observation_id),
input.signal_score,
signal_payload,
);
let signal_id_result = self.record_signal(&signal_input).await;
let signal_id = match signal_id_result {
Ok(signal_id) => signal_id,
Err(error) => return Err(error),
};
Ok(crate::KbDetectionTokenCandidateResult {
token_id,
observation_id,
signal_id,
})
}
}
#[cfg(test)]
mod tests {
async fn create_database() -> crate::KbDatabase {
let tempdir = tempfile::tempdir().expect("tempdir must succeed");
let database_path = tempdir.path().join("detect_pipeline.sqlite3");
let config = crate::KbDatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
crate::KbDatabase::connect_and_initialize(&config)
.await
.expect("database init must succeed")
}
#[tokio::test]
async fn record_observation_and_signal_work() {
let database = create_database().await;
let service = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database));
let observation_id = service
.record_observation(&crate::KbDetectionObservationInput::new(
"rpc.program_notification".to_string(),
crate::KbObservationSourceKind::WsRpc,
Some("mainnet_public_ws_slots".to_string()),
"So11111111111111111111111111111111111111112".to_string(),
Some(123456),
serde_json::json!({
"mint": "So11111111111111111111111111111111111111112"
}),
))
.await
.expect("record observation must succeed");
assert!(observation_id > 0);
let signal_id = service
.record_signal(&crate::KbDetectionSignalInput::new(
"rpc.token_candidate".to_string(),
crate::KbAnalysisSignalSeverity::Low,
"So11111111111111111111111111111111111111112".to_string(),
Some(observation_id),
Some(0.25),
serde_json::json!({
"reason": "test"
}),
))
.await
.expect("record signal must succeed");
assert!(signal_id > 0);
let observations = crate::list_recent_onchain_observations(service.database().as_ref(), 10)
.await
.expect("list observations must succeed");
let signals = crate::list_recent_analysis_signals(service.database().as_ref(), 10)
.await
.expect("list signals must succeed");
assert_eq!(observations.len(), 1);
assert_eq!(signals.len(), 1);
assert_eq!(
observations[0].object_key,
"So11111111111111111111111111111111111111112"
);
assert_eq!(
signals[0].object_key,
"So11111111111111111111111111111111111111112"
);
}
#[tokio::test]
async fn register_token_candidate_persists_token_observation_and_signal() {
let database = create_database().await;
let service = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database));
let result = service
.register_token_candidate(&crate::KbDetectionTokenCandidateInput::new(
"Mint111111111111111111111111111111111111111".to_string(),
Some("TEST".to_string()),
Some("Test Token".to_string()),
Some(6),
crate::SPL_TOKEN_PROGRAM_ID.to_string(),
false,
crate::KbObservationSourceKind::WsRpc,
Some("helius_primary_ws_programs".to_string()),
Some(777777),
"rpc.token_candidate".to_string(),
serde_json::json!({
"mint": "Mint111111111111111111111111111111111111111",
"source": "ws"
}),
"signal.token_candidate".to_string(),
crate::KbAnalysisSignalSeverity::Medium,
Some(0.70),
None,
))
.await
.expect("register token candidate must succeed");
assert!(result.token_id > 0);
assert!(result.observation_id > 0);
assert!(result.signal_id > 0);
let token = crate::get_token_by_mint(
service.database().as_ref(),
"Mint111111111111111111111111111111111111111",
)
.await
.expect("get token must succeed");
assert!(token.is_some());
assert_eq!(
token.expect("token must exist").symbol.as_deref(),
Some("TEST")
);
let observations = crate::list_recent_onchain_observations(service.database().as_ref(), 10)
.await
.expect("list observations must succeed");
let signals = crate::list_recent_analysis_signals(service.database().as_ref(), 10)
.await
.expect("list signals must succeed");
assert_eq!(observations.len(), 1);
assert_eq!(signals.len(), 1);
assert_eq!(
observations[0].object_key,
"Mint111111111111111111111111111111111111111"
);
assert_eq!(
signals[0].object_key,
"Mint111111111111111111111111111111111111111"
);
assert_eq!(
signals[0].related_observation_id,
Some(result.observation_id)
);
}
}

164
kb_lib/src/detect/types.rs Normal file
View File

@@ -0,0 +1,164 @@
// file: kb_lib/src/detect/types.rs
//! Detection pipeline input and output types.
/// One normalized observation ready to be persisted.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbDetectionObservationInput {
/// Observation kind.
pub observation_kind: std::string::String,
/// Observation source family.
pub source_kind: crate::KbObservationSourceKind,
/// Optional logical source endpoint name.
pub endpoint_name: std::option::Option<std::string::String>,
/// Logical object key, for example a mint, signature or pool address.
pub object_key: std::string::String,
/// Optional slot number.
pub slot: std::option::Option<u64>,
/// JSON payload.
pub payload: serde_json::Value,
}
impl KbDetectionObservationInput {
/// Creates a new detection observation input.
pub fn new(
observation_kind: std::string::String,
source_kind: crate::KbObservationSourceKind,
endpoint_name: std::option::Option<std::string::String>,
object_key: std::string::String,
slot: std::option::Option<u64>,
payload: serde_json::Value,
) -> Self {
Self {
observation_kind,
source_kind,
endpoint_name,
object_key,
slot,
payload,
}
}
}
/// One normalized signal ready to be persisted.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbDetectionSignalInput {
/// Signal kind.
pub signal_kind: std::string::String,
/// Signal severity.
pub severity: crate::KbAnalysisSignalSeverity,
/// Logical object key, for example a mint, signature or pool address.
pub object_key: std::string::String,
/// Optional related observation id.
pub related_observation_id: std::option::Option<i64>,
/// Optional score.
pub score: std::option::Option<f64>,
/// JSON payload.
pub payload: serde_json::Value,
}
impl KbDetectionSignalInput {
/// Creates a new detection signal input.
pub fn new(
signal_kind: std::string::String,
severity: crate::KbAnalysisSignalSeverity,
object_key: std::string::String,
related_observation_id: std::option::Option<i64>,
score: std::option::Option<f64>,
payload: serde_json::Value,
) -> Self {
Self {
signal_kind,
severity,
object_key,
related_observation_id,
score,
payload,
}
}
}
/// One token candidate detected from a technical source.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbDetectionTokenCandidateInput {
/// Token mint address.
pub mint: std::string::String,
/// Optional token symbol.
pub symbol: std::option::Option<std::string::String>,
/// Optional token name.
pub name: std::option::Option<std::string::String>,
/// Optional decimals.
pub decimals: std::option::Option<u8>,
/// Token program id.
pub token_program: std::string::String,
/// Whether the token is typically used as quote token.
pub is_quote_token: bool,
/// Observation source family.
pub source_kind: crate::KbObservationSourceKind,
/// Optional source endpoint logical name.
pub endpoint_name: std::option::Option<std::string::String>,
/// Optional slot number.
pub slot: std::option::Option<u64>,
/// Observation kind.
pub observation_kind: std::string::String,
/// Observation payload.
pub observation_payload: serde_json::Value,
/// Signal kind.
pub signal_kind: std::string::String,
/// Signal severity.
pub signal_severity: crate::KbAnalysisSignalSeverity,
/// Optional signal score.
pub signal_score: std::option::Option<f64>,
/// Optional dedicated signal payload.
pub signal_payload: std::option::Option<serde_json::Value>,
}
impl KbDetectionTokenCandidateInput {
/// Creates a new token candidate input.
pub fn new(
mint: std::string::String,
symbol: std::option::Option<std::string::String>,
name: std::option::Option<std::string::String>,
decimals: std::option::Option<u8>,
token_program: std::string::String,
is_quote_token: bool,
source_kind: crate::KbObservationSourceKind,
endpoint_name: std::option::Option<std::string::String>,
slot: std::option::Option<u64>,
observation_kind: std::string::String,
observation_payload: serde_json::Value,
signal_kind: std::string::String,
signal_severity: crate::KbAnalysisSignalSeverity,
signal_score: std::option::Option<f64>,
signal_payload: std::option::Option<serde_json::Value>,
) -> Self {
Self {
mint,
symbol,
name,
decimals,
token_program,
is_quote_token,
source_kind,
endpoint_name,
slot,
observation_kind,
observation_payload,
signal_kind,
signal_severity,
signal_score,
signal_payload,
}
}
}
/// Result of one token candidate persistence operation.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbDetectionTokenCandidateResult {
/// Persisted token id.
pub token_id: i64,
/// Persisted observation id.
pub observation_id: i64,
/// Persisted signal id.
pub signal_id: i64,
}

View File

@@ -19,6 +19,7 @@ mod ws_client;
mod rpc_ws_solana;
mod http_pool;
mod db;
mod detect;
pub use crate::config::KbAppConfig;
pub use crate::config::KbConfig;
@@ -151,3 +152,8 @@ pub use crate::db::list_pairs;
pub use crate::db::list_pool_listings;
pub use crate::db::list_pool_tokens_by_pool_id;
pub use crate::db::list_pools;
pub use crate::detect::KbDetectionObservationInput;
pub use crate::detect::KbDetectionPersistenceService;
pub use crate::detect::KbDetectionSignalInput;
pub use crate::detect::KbDetectionTokenCandidateInput;
pub use crate::detect::KbDetectionTokenCandidateResult;