This commit is contained in:
2026-04-25 12:07:46 +02:00
parent 2391d4c061
commit d4de95e6db
9 changed files with 444 additions and 10 deletions

View File

@@ -15,6 +15,8 @@ pub use crate::detect::service::KbDetectionPersistenceService;
pub use crate::detect::solana_ws::KbSolanaWsDetectionOutcome;
pub use crate::detect::solana_ws::KbSolanaWsDetectionService;
pub use crate::detect::types::KbDetectionObservationInput;
pub use crate::detect::types::KbDetectionPoolCandidateInput;
pub use crate::detect::types::KbDetectionPoolCandidateResult;
pub use crate::detect::types::KbDetectionSignalInput;
pub use crate::detect::types::KbDetectionTokenCandidateInput;
pub use crate::detect::types::KbDetectionTokenCandidateResult;

View File

@@ -14,7 +14,6 @@ impl KbDetectionPersistenceService {
pub fn new(database: std::sync::Arc<crate::KbDatabase>) -> Self {
Self { database }
}
/// Returns the shared database handle.
pub fn database(&self) -> &std::sync::Arc<crate::KbDatabase> {
&self.database
@@ -111,6 +110,125 @@ impl KbDetectionPersistenceService {
signal_id,
})
}
/// Registers one pool candidate from a technical source.
///
/// This method:
/// - resolves one known DEX from the supplied program id,
/// - upserts one normalized pool entry,
/// - upserts one pool listing row,
/// - stores the technical observation and derived signal.
pub async fn register_pool_candidate(
&self,
input: &crate::KbDetectionPoolCandidateInput,
) -> Result<crate::KbDetectionPoolCandidateResult, crate::KbError> {
let dexes_result = crate::list_dexes(self.database.as_ref()).await;
let dexes = match dexes_result {
Ok(dexes) => dexes,
Err(error) => return Err(error),
};
let mut matched_dex_option: std::option::Option<crate::KbDexDto> = None;
for dex in dexes {
let mut matched = false;
if let Some(program_id) = &dex.program_id {
if program_id == &input.dex_program_id {
matched = true;
}
}
if !matched {
if let Some(router_program_id) = &dex.router_program_id {
if router_program_id == &input.dex_program_id {
matched = true;
}
}
}
if matched {
matched_dex_option = Some(dex);
break;
}
}
let matched_dex = match matched_dex_option {
Some(matched_dex) => matched_dex,
None => {
return Err(crate::KbError::Db(format!(
"cannot register pool candidate: no known dex matches program id '{}'",
input.dex_program_id
)));
}
};
let dex_id = match matched_dex.id {
Some(dex_id) => dex_id,
None => {
return Err(crate::KbError::Db(
"cannot register pool candidate: matched dex has no id".to_string(),
));
}
};
let pool_dto = crate::KbPoolDto::new(
dex_id,
input.pool_address.clone(),
crate::KbPoolKind::Unknown,
crate::KbPoolStatus::Pending,
);
let pool_id_result = crate::upsert_pool(self.database.as_ref(), &pool_dto).await;
let pool_id = match pool_id_result {
Ok(pool_id) => pool_id,
Err(error) => return Err(error),
};
let listing_dto = crate::KbPoolListingDto::new(
dex_id,
pool_id,
None,
input.source_kind,
input.endpoint_name.clone(),
None,
None,
None,
);
let pool_listing_id_result =
crate::upsert_pool_listing(self.database.as_ref(), &listing_dto).await;
let pool_listing_id = match pool_listing_id_result {
Ok(pool_listing_id) => pool_listing_id,
Err(error) => return Err(error),
};
let observation_input = crate::KbDetectionObservationInput::new(
input.observation_kind.clone(),
input.source_kind,
input.endpoint_name.clone(),
input.pool_address.clone(),
input.slot,
input.observation_payload.clone(),
);
let observation_id_result = self.record_observation(&observation_input).await;
let observation_id = match observation_id_result {
Ok(observation_id) => observation_id,
Err(error) => return Err(error),
};
let signal_payload = match &input.signal_payload {
Some(signal_payload) => signal_payload.clone(),
None => input.observation_payload.clone(),
};
let signal_input = crate::KbDetectionSignalInput::new(
input.signal_kind.clone(),
input.signal_severity,
input.pool_address.clone(),
Some(observation_id),
input.signal_score,
signal_payload,
);
let signal_id_result = self.record_signal(&signal_input).await;
let signal_id = match signal_id_result {
Ok(signal_id) => signal_id,
Err(error) => return Err(error),
};
Ok(crate::KbDetectionPoolCandidateResult {
dex_id,
pool_id,
pool_listing_id,
observation_id,
signal_id,
})
}
}
#[cfg(test)]
@@ -247,4 +365,60 @@ mod tests {
Some(result.observation_id)
);
}
#[tokio::test]
async fn register_pool_candidate_persists_pool_listing_observation_and_signal() {
let database = create_database().await;
let service = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database));
let dex_id = crate::upsert_dex(
service.database().as_ref(),
&crate::KbDexDto::new(
"raydium".to_string(),
"Raydium".to_string(),
Some("DexProgram111111111111111111111111111111111".to_string()),
None,
true,
),
)
.await
.expect("upsert dex must succeed");
assert!(dex_id > 0);
let result = service
.register_pool_candidate(&crate::KbDetectionPoolCandidateInput::new(
"Pool111111111111111111111111111111111111111".to_string(),
"DexProgram111111111111111111111111111111111".to_string(),
crate::KbObservationSourceKind::WsRpc,
Some("helius_primary_ws_programs".to_string()),
Some(999999),
"ws.program_notification".to_string(),
serde_json::json!({
"pool": "Pool111111111111111111111111111111111111111"
}),
"signal.pool_candidate.raydium".to_string(),
crate::KbAnalysisSignalSeverity::Medium,
None,
None,
))
.await
.expect("register pool candidate must succeed");
assert!(result.dex_id > 0);
assert!(result.pool_id > 0);
assert!(result.pool_listing_id > 0);
assert!(result.observation_id > 0);
assert!(result.signal_id > 0);
let pool = crate::get_pool_by_address(
service.database().as_ref(),
"Pool111111111111111111111111111111111111111",
)
.await
.expect("get pool must succeed");
assert!(pool.is_some());
let pool = pool.expect("pool must exist");
assert_eq!(pool.status, crate::KbPoolStatus::Pending);
let listing =
crate::get_pool_listing_by_pool_id(service.database().as_ref(), result.pool_id)
.await
.expect("get listing must succeed");
assert!(listing.is_some());
}
}

View File

@@ -3,7 +3,7 @@
//! Solana WebSocket detection bridge.
//!
//! This module converts raw Solana JSON-RPC WebSocket notifications into
//! normalized observations and, when possible, token candidates.
//! normalized observations and, when possible, token and pool candidates.
/// Result of one Solana WebSocket detection pass.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@@ -20,6 +20,11 @@ pub enum KbSolanaWsDetectionOutcome {
/// Persistence result.
result: crate::KbDetectionTokenCandidateResult,
},
/// A pool candidate was registered.
PoolCandidateRegistered {
/// Persistence result.
result: crate::KbDetectionPoolCandidateResult,
},
}
/// Detection service for Solana WebSocket notifications.
@@ -62,6 +67,16 @@ impl KbSolanaWsDetectionService {
Ok(None) => {}
Err(error) => return Err(error),
}
let pool_candidate_result = self
.try_register_pool_candidate(endpoint_name.clone(), notification)
.await;
match pool_candidate_result {
Ok(Some(result)) => {
return Ok(crate::KbSolanaWsDetectionOutcome::PoolCandidateRegistered { result });
}
Ok(None) => {}
Err(error) => return Err(error),
}
let payload = build_notification_payload(notification);
let object_key = build_object_key(
notification.method.as_str(),
@@ -199,6 +214,84 @@ impl KbSolanaWsDetectionService {
Err(error) => Err(error),
}
}
/// Tries to register a pool candidate from one notification.
async fn try_register_pool_candidate(
&self,
endpoint_name: std::option::Option<std::string::String>,
notification: &crate::KbJsonRpcWsNotification,
) -> Result<std::option::Option<crate::KbDetectionPoolCandidateResult>, crate::KbError> {
if notification.method.as_str() != "programNotification" {
return Ok(None);
}
let result_value = &notification.params.result;
let pubkey_option = extract_pubkey_from_result(result_value);
let pubkey = match pubkey_option {
Some(pubkey) => pubkey,
None => return Ok(None),
};
let owner_option = extract_program_notification_owner(result_value);
let owner = match owner_option {
Some(owner) => owner,
None => return Ok(None),
};
if owner == crate::SPL_TOKEN_PROGRAM_ID.to_string()
|| owner == crate::SPL_TOKEN_2022_PROGRAM_ID.to_string()
{
return Ok(None);
}
let dexes_result = crate::list_dexes(self.persistence.database().as_ref()).await;
let dexes = match dexes_result {
Ok(dexes) => dexes,
Err(error) => return Err(error),
};
let mut matched_dex_option: std::option::Option<crate::KbDexDto> = None;
for dex in dexes {
let mut matched = false;
if let Some(program_id) = &dex.program_id {
if program_id == &owner {
matched = true;
}
}
if !matched {
if let Some(router_program_id) = &dex.router_program_id {
if router_program_id == &owner {
matched = true;
}
}
}
if matched {
matched_dex_option = Some(dex);
break;
}
}
let matched_dex = match matched_dex_option {
Some(matched_dex) => matched_dex,
None => return Ok(None),
};
let payload = build_notification_payload(notification);
let slot =
extract_slot_from_result(notification.method.as_str(), &notification.params.result);
let signal_kind = format!("signal.pool_candidate.{}", matched_dex.code);
let input = crate::KbDetectionPoolCandidateInput::new(
pubkey,
owner,
crate::KbObservationSourceKind::WsRpc,
endpoint_name,
slot,
"ws.program_notification".to_string(),
payload.clone(),
signal_kind,
crate::KbAnalysisSignalSeverity::Medium,
None,
Some(payload),
);
let result = self.persistence.register_pool_candidate(&input).await;
match result {
Ok(result) => Ok(Some(result)),
Err(error) => Err(error),
}
}
}
/// Maps one WebSocket notification method to an observation kind.
@@ -247,6 +340,7 @@ fn build_object_key(
if let Some(slot) = slot_option {
return format!("slot:{slot}");
}
format!("subscription:{subscription}")
}
@@ -682,6 +776,31 @@ mod tests {
}
}
fn build_program_pool_candidate_notification() -> crate::KbJsonRpcWsNotification {
crate::KbJsonRpcWsNotification {
jsonrpc: "2.0".to_string(),
method: "programNotification".to_string(),
params: crate::KbJsonRpcWsNotificationParams {
result: serde_json::json!({
"context": {
"slot": 666666_u64
},
"value": {
"pubkey": "Pool111111111111111111111111111111111111111",
"account": {
"owner": "DexProgram111111111111111111111111111111111",
"data": [
"AQID",
"base64"
]
}
}
}),
subscription: 5555_u64,
},
}
}
fn build_logs_notification() -> crate::KbJsonRpcWsNotification {
crate::KbJsonRpcWsNotification {
jsonrpc: "2.0".to_string(),
@@ -899,4 +1018,61 @@ mod tests {
"signal.signature_notification.confirmed"
);
}
#[tokio::test]
async fn program_pool_candidate_notification_registers_pool_candidate() {
let database = create_database().await;
let persistence = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database));
let dex_id = crate::upsert_dex(
persistence.database().as_ref(),
&crate::KbDexDto::new(
"raydium".to_string(),
"Raydium".to_string(),
Some("DexProgram111111111111111111111111111111111".to_string()),
None,
true,
),
)
.await
.expect("upsert dex must succeed");
assert!(dex_id > 0);
let detector = crate::KbSolanaWsDetectionService::new(persistence);
let outcome_result = detector
.process_notification(
Some("helius_primary_ws_programs".to_string()),
&build_program_pool_candidate_notification(),
)
.await;
let outcome = match outcome_result {
Ok(outcome) => outcome,
Err(error) => panic!("process_notification failed: {error}"),
};
let pool_id = match outcome {
crate::KbSolanaWsDetectionOutcome::PoolCandidateRegistered { result } => {
assert!(result.dex_id > 0);
assert!(result.pool_id > 0);
assert!(result.pool_listing_id > 0);
result.pool_id
}
_ => panic!("unexpected detection outcome"),
};
let pool_result = crate::get_pool_by_address(
detector.persistence().database().as_ref(),
"Pool111111111111111111111111111111111111111",
)
.await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => panic!("get_pool_by_address failed: {error}"),
};
assert!(pool_option.is_some());
let listing_result =
crate::get_pool_listing_by_pool_id(detector.persistence().database().as_ref(), pool_id)
.await;
let listing_option = match listing_result {
Ok(listing_option) => listing_option,
Err(error) => panic!("get_pool_listing_by_pool_id failed: {error}"),
};
assert!(listing_option.is_some());
}
}

View File

@@ -115,6 +115,7 @@ pub struct KbDetectionTokenCandidateInput {
impl KbDetectionTokenCandidateInput {
/// Creates a new token candidate input.
#[allow(clippy::too_many_arguments)]
pub fn new(
mint: std::string::String,
symbol: std::option::Option<std::string::String>,
@@ -162,3 +163,77 @@ pub struct KbDetectionTokenCandidateResult {
/// Persisted signal id.
pub signal_id: i64,
}
/// One pool candidate detected from a technical source.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbDetectionPoolCandidateInput {
/// Pool address.
pub pool_address: std::string::String,
/// DEX program id or router program id that matched.
pub dex_program_id: std::string::String,
/// Observation source family.
pub source_kind: crate::KbObservationSourceKind,
/// Optional source endpoint logical name.
pub endpoint_name: std::option::Option<std::string::String>,
/// Optional slot number.
pub slot: std::option::Option<u64>,
/// Observation kind.
pub observation_kind: std::string::String,
/// Observation payload.
pub observation_payload: serde_json::Value,
/// Signal kind.
pub signal_kind: std::string::String,
/// Signal severity.
pub signal_severity: crate::KbAnalysisSignalSeverity,
/// Optional signal score.
pub signal_score: std::option::Option<f64>,
/// Optional dedicated signal payload.
pub signal_payload: std::option::Option<serde_json::Value>,
}
impl KbDetectionPoolCandidateInput {
/// Creates a new pool candidate input.
#[allow(clippy::too_many_arguments)]
pub fn new(
pool_address: std::string::String,
dex_program_id: std::string::String,
source_kind: crate::KbObservationSourceKind,
endpoint_name: std::option::Option<std::string::String>,
slot: std::option::Option<u64>,
observation_kind: std::string::String,
observation_payload: serde_json::Value,
signal_kind: std::string::String,
signal_severity: crate::KbAnalysisSignalSeverity,
signal_score: std::option::Option<f64>,
signal_payload: std::option::Option<serde_json::Value>,
) -> Self {
Self {
pool_address,
dex_program_id,
source_kind,
endpoint_name,
slot,
observation_kind,
observation_payload,
signal_kind,
signal_severity,
signal_score,
signal_payload,
}
}
}
/// Result of one pool candidate persistence operation.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbDetectionPoolCandidateResult {
/// Persisted dex id.
pub dex_id: i64,
/// Persisted pool id.
pub pool_id: i64,
/// Persisted pool listing id.
pub pool_listing_id: i64,
/// Persisted observation id.
pub observation_id: i64,
/// Persisted signal id.
pub signal_id: i64,
}

View File

@@ -38,6 +38,8 @@ pub struct KbWsDetectionRelayStats {
pub observation_count: u64,
/// Number of registered token candidates.
pub token_candidate_count: u64,
/// Number of registered pool candidates.
pub pool_candidate_count: u64,
/// Number of processing errors.
pub error_count: u64,
}
@@ -113,6 +115,9 @@ impl KbWsDetectionRelay {
crate::KbSolanaWsDetectionOutcome::TokenCandidateRegistered { .. } => {
stats.token_candidate_count += 1;
}
crate::KbSolanaWsDetectionOutcome::PoolCandidateRegistered { .. } => {
stats.pool_candidate_count += 1;
}
}
}
stats

View File

@@ -162,3 +162,5 @@ pub use crate::detect::KbSolanaWsDetectionService;
pub use crate::detect::KbWsDetectionNotificationEnvelope;
pub use crate::detect::KbWsDetectionRelay;
pub use crate::detect::KbWsDetectionRelayStats;
pub use crate::detect::KbDetectionPoolCandidateInput;
pub use crate::detect::KbDetectionPoolCandidateResult;