This commit is contained in:
2026-04-25 11:36:36 +02:00
parent 2fee5db206
commit 2391d4c061
4 changed files with 464 additions and 58 deletions

View File

@@ -73,10 +73,10 @@ impl KbSolanaWsDetectionService {
let observation_input = crate::KbDetectionObservationInput::new(
observation_kind,
crate::KbObservationSourceKind::WsRpc,
endpoint_name,
object_key,
endpoint_name.clone(),
object_key.clone(),
slot,
payload,
payload.clone(),
);
let observation_id_result = self
.persistence
@@ -86,6 +86,32 @@ impl KbSolanaWsDetectionService {
Ok(observation_id) => observation_id,
Err(error) => return Err(error),
};
let should_emit_signal = match notification.method.as_str() {
"accountNotification" => true,
"logsNotification" => true,
"signatureNotification" => true,
_ => false,
};
if should_emit_signal {
let signal_input = crate::KbDetectionSignalInput::new(
build_signal_kind_for_notification(
notification.method.as_str(),
&notification.params.result,
),
build_signal_severity_for_notification(
notification.method.as_str(),
&notification.params.result,
),
object_key,
Some(observation_id),
None,
payload,
);
let signal_result = self.persistence.record_signal(&signal_input).await;
if let Err(error) = signal_result {
return Err(error);
}
}
Ok(crate::KbSolanaWsDetectionOutcome::ObservationRecorded { observation_id })
}
@@ -114,9 +140,6 @@ impl KbSolanaWsDetectionService {
Some(parsed_type) => parsed_type,
None => return Ok(None),
};
if parsed_type.as_str() != "mint" {
return Ok(None);
}
let token_program_option = extract_account_owner(account_value);
let token_program = match token_program_option {
Some(token_program) => token_program,
@@ -127,13 +150,30 @@ impl KbSolanaWsDetectionService {
{
return Ok(None);
}
let decimals = extract_decimals_from_account_value(account_value);
let decimals = if parsed_type.as_str() == "mint" {
extract_decimals_from_account_value(account_value)
} else if parsed_type.as_str() == "account" {
extract_token_account_decimals_from_account_value(account_value)
} else {
None
};
let mint = if parsed_type.as_str() == "mint" {
pubkey.clone()
} else if parsed_type.as_str() == "account" {
let mint_option = extract_parsed_account_mint(account_value);
match mint_option {
Some(mint) => mint,
None => return Ok(None),
}
} else {
return Ok(None);
};
let slot =
extract_slot_from_result(notification.method.as_str(), &notification.params.result);
let payload = build_notification_payload(notification);
let is_quote_token = pubkey == crate::WSOL_MINT_ID.to_string();
let is_quote_token = mint == crate::WSOL_MINT_ID.to_string();
let input = crate::KbDetectionTokenCandidateInput::new(
pubkey,
mint,
None,
None,
decimals,
@@ -144,7 +184,11 @@ impl KbSolanaWsDetectionService {
slot,
"ws.program_notification".to_string(),
payload.clone(),
"signal.token_mint_account_detected".to_string(),
if parsed_type.as_str() == "mint" {
"signal.token_mint_account_detected".to_string()
} else {
"signal.token_account_detected".to_string()
},
crate::KbAnalysisSignalSeverity::Medium,
None,
Some(payload),
@@ -342,6 +386,235 @@ fn extract_decimals_from_account_value(
}
}
/// Extracts the owner program id from one program notification result.
fn extract_program_notification_owner(
result: &serde_json::Value,
) -> std::option::Option<std::string::String> {
let account_value_option = extract_account_value_from_result(result);
let account_value = match account_value_option {
Some(account_value) => account_value,
None => return None,
};
extract_account_owner(account_value)
}
/// Extracts the parsed token amount decimals from one parsed token account notification.
fn extract_token_account_decimals_from_account_value(
account_value: &serde_json::Value,
) -> std::option::Option<u8> {
let data_option = account_value.get("data");
let data = match data_option {
Some(data) => data,
None => return None,
};
let parsed_option = data.get("parsed");
let parsed = match parsed_option {
Some(parsed) => parsed,
None => return None,
};
let info_option = parsed.get("info");
let info = match info_option {
Some(info) => info,
None => return None,
};
let token_amount_option = info.get("tokenAmount");
let token_amount = match token_amount_option {
Some(token_amount) => token_amount,
None => return None,
};
let decimals_option = token_amount
.get("decimals")
.and_then(serde_json::Value::as_u64);
let decimals = match decimals_option {
Some(decimals) => decimals,
None => return None,
};
let convert_result = u8::try_from(decimals);
match convert_result {
Ok(decimals) => Some(decimals),
Err(_) => None,
}
}
/// Extracts a parsed account mint from one account-like JSON object.
fn extract_parsed_account_mint(
account_value: &serde_json::Value,
) -> std::option::Option<std::string::String> {
let data_option = account_value.get("data");
let data = match data_option {
Some(data) => data,
None => return None,
};
let parsed_option = data.get("parsed");
let parsed = match parsed_option {
Some(parsed) => parsed,
None => return None,
};
let info_option = parsed.get("info");
let info = match info_option {
Some(info) => info,
None => return None,
};
let mint_option = info.get("mint").and_then(serde_json::Value::as_str);
match mint_option {
Some(mint) => Some(mint.to_string()),
None => None,
}
}
/// Extracts log lines from one logs notification result.
fn extract_logs_lines(result: &serde_json::Value) -> std::vec::Vec<std::string::String> {
let mut lines = std::vec::Vec::new();
let value_option = result.get("value");
let value = match value_option {
Some(value) => value,
None => return lines,
};
let logs_option = value.get("logs");
let logs = match logs_option {
Some(logs) => logs,
None => return lines,
};
let array_option = logs.as_array();
let array = match array_option {
Some(array) => array,
None => return lines,
};
for item in array {
let line_option = item.as_str();
if let Some(line) = line_option {
lines.push(line.to_string());
}
}
lines
}
/// Extracts the error field from a signature notification result.
fn extract_signature_notification_err(
result: &serde_json::Value,
) -> std::option::Option<serde_json::Value> {
let value_option = result.get("value");
let value = match value_option {
Some(value) => value,
None => return None,
};
match value.get("err") {
Some(err) => Some(err.clone()),
None => None,
}
}
/// Builds a more specific signal kind for one WebSocket notification.
fn build_signal_kind_for_notification(
method: &str,
result: &serde_json::Value,
) -> std::string::String {
match method {
"accountNotification" => {
let account_value_option = extract_account_value_from_result(result);
if let Some(account_value) = account_value_option {
let parsed_type_option = extract_parsed_account_type(account_value);
if let Some(parsed_type) = parsed_type_option {
return format!("signal.account_notification.{parsed_type}");
}
let owner_option = extract_account_owner(account_value);
if let Some(owner) = owner_option {
if owner == crate::SPL_TOKEN_PROGRAM_ID.to_string() {
return "signal.account_notification.spl_token".to_string();
}
if owner == crate::SPL_TOKEN_2022_PROGRAM_ID.to_string() {
return "signal.account_notification.spl_token_2022".to_string();
}
}
}
"signal.account_notification.generic".to_string()
}
"logsNotification" => {
let lines = extract_logs_lines(result);
for line in &lines {
if line.contains("Instruction: InitializeMint") {
return "signal.logs_notification.initialize_mint".to_string();
}
if line.contains("Instruction: MintTo") {
return "signal.logs_notification.mint_to".to_string();
}
if line.contains("Instruction: Burn") {
return "signal.logs_notification.burn".to_string();
}
if line.contains("Instruction: InitializeAccount") {
return "signal.logs_notification.initialize_account".to_string();
}
}
"signal.logs_notification.generic".to_string()
}
"signatureNotification" => {
let err_option = extract_signature_notification_err(result);
match err_option {
Some(err) => {
if err.is_null() {
"signal.signature_notification.confirmed".to_string()
} else {
"signal.signature_notification.failed".to_string()
}
}
None => "signal.signature_notification.generic".to_string(),
}
}
"programNotification" => {
let owner_option = extract_program_notification_owner(result);
let owner = match owner_option {
Some(owner) => owner,
None => return "signal.program_notification.generic".to_string(),
};
if owner == crate::SPL_TOKEN_PROGRAM_ID.to_string() {
return "signal.program_notification.spl_token".to_string();
}
if owner == crate::SPL_TOKEN_2022_PROGRAM_ID.to_string() {
return "signal.program_notification.spl_token_2022".to_string();
}
"signal.program_notification.generic".to_string()
}
_ => format!(
"signal.{}",
method.replace("Notification", "").to_lowercase()
),
}
}
/// Builds a more specific severity for one WebSocket notification.
fn build_signal_severity_for_notification(
method: &str,
result: &serde_json::Value,
) -> crate::KbAnalysisSignalSeverity {
match method {
"programNotification" => crate::KbAnalysisSignalSeverity::Medium,
"accountNotification" => crate::KbAnalysisSignalSeverity::Low,
"logsNotification" => {
let lines = extract_logs_lines(result);
for line in &lines {
if line.contains("Instruction: InitializeMint") {
return crate::KbAnalysisSignalSeverity::Medium;
}
}
crate::KbAnalysisSignalSeverity::Low
}
"signatureNotification" => {
let err_option = extract_signature_notification_err(result);
match err_option {
Some(err) => {
if err.is_null() {
crate::KbAnalysisSignalSeverity::Low
} else {
crate::KbAnalysisSignalSeverity::Medium
}
}
None => crate::KbAnalysisSignalSeverity::Low,
}
}
_ => crate::KbAnalysisSignalSeverity::Low,
}
}
#[cfg(test)]
mod tests {
async fn create_database() -> crate::KbDatabase {
@@ -409,6 +682,46 @@ mod tests {
}
}
fn build_logs_notification() -> crate::KbJsonRpcWsNotification {
crate::KbJsonRpcWsNotification {
jsonrpc: "2.0".to_string(),
method: "logsNotification".to_string(),
params: crate::KbJsonRpcWsNotificationParams {
result: serde_json::json!({
"context": {
"slot": 888888_u64
},
"value": {
"signature": "Sig111111111111111111111111111111111111111111111111",
"err": null,
"logs": [
"Program log: Instruction: InitializeMint"
]
}
}),
subscription: 3001_u64,
},
}
}
fn build_signature_notification() -> crate::KbJsonRpcWsNotification {
crate::KbJsonRpcWsNotification {
jsonrpc: "2.0".to_string(),
method: "signatureNotification".to_string(),
params: crate::KbJsonRpcWsNotificationParams {
result: serde_json::json!({
"context": {
"slot": 999999_u64
},
"value": {
"err": null
}
}),
subscription: 4001_u64,
},
}
}
#[tokio::test]
async fn slot_notification_records_observation() {
let database = create_database().await;
@@ -500,4 +813,90 @@ mod tests {
"Mint111111111111111111111111111111111111111"
);
}
#[tokio::test]
async fn logs_notification_records_observation_and_signal() {
let database = create_database().await;
let persistence = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database));
let detector = crate::KbSolanaWsDetectionService::new(persistence);
let outcome_result = detector
.process_notification(
Some("helius_primary_ws_programs".to_string()),
&build_logs_notification(),
)
.await;
let outcome = match outcome_result {
Ok(outcome) => outcome,
Err(error) => panic!("process_notification failed: {error}"),
};
match outcome {
crate::KbSolanaWsDetectionOutcome::ObservationRecorded { observation_id } => {
assert!(observation_id > 0);
}
_ => panic!("unexpected detection outcome"),
}
let observations_result =
crate::list_recent_onchain_observations(detector.persistence().database().as_ref(), 10)
.await;
let observations = match observations_result {
Ok(observations) => observations,
Err(error) => panic!("list_recent_onchain_observations failed: {error}"),
};
let signals_result =
crate::list_recent_analysis_signals(detector.persistence().database().as_ref(), 10)
.await;
let signals = match signals_result {
Ok(signals) => signals,
Err(error) => panic!("list_recent_analysis_signals failed: {error}"),
};
assert_eq!(observations.len(), 1);
assert_eq!(signals.len(), 1);
assert_eq!(
signals[0].signal_kind,
"signal.logs_notification.initialize_mint"
);
}
#[tokio::test]
async fn signature_notification_records_observation_and_signal() {
let database = create_database().await;
let persistence = crate::KbDetectionPersistenceService::new(std::sync::Arc::new(database));
let detector = crate::KbSolanaWsDetectionService::new(persistence);
let outcome_result = detector
.process_notification(
Some("mainnet_public_ws_slots".to_string()),
&build_signature_notification(),
)
.await;
let outcome = match outcome_result {
Ok(outcome) => outcome,
Err(error) => panic!("process_notification failed: {error}"),
};
match outcome {
crate::KbSolanaWsDetectionOutcome::ObservationRecorded { observation_id } => {
assert!(observation_id > 0);
}
_ => panic!("unexpected detection outcome"),
}
let observations_result =
crate::list_recent_onchain_observations(detector.persistence().database().as_ref(), 10)
.await;
let observations = match observations_result {
Ok(observations) => observations,
Err(error) => panic!("list_recent_onchain_observations failed: {error}"),
};
let signals_result =
crate::list_recent_analysis_signals(detector.persistence().database().as_ref(), 10)
.await;
let signals = match signals_result {
Ok(signals) => signals,
Err(error) => panic!("list_recent_analysis_signals failed: {error}"),
};
assert_eq!(observations.len(), 1);
assert_eq!(signals.len(), 1);
assert_eq!(
signals[0].signal_kind,
"signal.signature_notification.confirmed"
);
}
}