This commit is contained in:
2026-04-19 17:58:33 +02:00
parent a78c4c52e2
commit 3964452af0
6 changed files with 849 additions and 74 deletions

View File

@@ -30,6 +30,7 @@ mod signal_correlation;
mod candidate;
mod session_candidate;
mod session_tracker;
mod ws_source;
/// Runs the listener application bootstrap workflow.
pub use crate::app::run_listener_app;
@@ -155,3 +156,7 @@ pub use crate::session_candidate::KhbbSessionCandidate;
pub use crate::session_tracker::KhbbSessionCandidateTracker;
/// Result of inserting or updating a session candidate.
pub use crate::session_tracker::KhbbSessionCandidateUpdate;
/// Event emitted by the autonomous WebSocket source.
pub use crate::ws_source::KhbbWsSourceEvent;
/// Handle used by the listener runtime to control a WebSocket source task.
pub use crate::ws_source::KhbbWsSourceHandle;

View File

@@ -52,14 +52,14 @@ pub async fn run_listener_runtime(
};
let ws_client_config =
crate::KhbbSolanaWsRpcClientConfig { url: config.solana_ws_rpc_url.clone() };
let ws_client_result = crate::KhbbSolanaWsRpcClient::new(ws_client_config);
let mut ws_client = match ws_client_result {
let ws_source_result = crate::KhbbWsSourceHandle::spawn(ws_client_config);
let mut ws_source = match ws_source_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let ws_connect_result = ws_client.connect().await;
let ws_connect_result = ws_source.connect().await;
match ws_connect_result {
Ok(()) => {
tracing::info!(
@@ -75,7 +75,7 @@ pub async fn run_listener_runtime(
let mut active_ws_subscriptions =
std::vec::Vec::<crate::domain::KhbbActiveWsSubscription>::new();
if config.enable_ws_slot_subscribe {
let slot_subscribe_result = ws_client.slot_subscribe(1).await;
let slot_subscribe_result = ws_source.slot_subscribe(1).await;
let slot_subscribe_output = match slot_subscribe_result {
Ok(value) => value,
Err(error) => {
@@ -135,7 +135,7 @@ pub async fn run_listener_runtime(
});
}
if config.enable_ws_logs_subscribe {
let logs_subscribe_result = ws_client
let logs_subscribe_result = ws_source
.logs_subscribe(solana_rpc_client_api::config::RpcTransactionLogsFilter::All, None, 2)
.await;
let logs_subscribe_output = match logs_subscribe_result {
@@ -200,7 +200,7 @@ pub async fn run_listener_runtime(
let mut program_request_id: u64 = 10;
for program_id in &config.ws_program_subscribe_program_ids {
let program_subscribe_result =
ws_client.program_subscribe(program_id, None, program_request_id).await;
ws_source.program_subscribe(program_id, None, program_request_id).await;
let program_subscribe_output = match program_subscribe_result {
Ok(value) => value,
Err(error) => {
@@ -315,13 +315,15 @@ pub async fn run_listener_runtime(
}
let ws_read_timeout_result = tokio::time::timeout(
std::time::Duration::from_millis(50),
ws_client.read_next_incoming_message(),
ws_source.recv_event(),
)
.await;
match ws_read_timeout_result {
Ok(read_result) => {
match read_result {
Ok(crate::KhbbWsIncomingMessage::Response { raw, id, .. }) => {
Ok(crate::KhbbWsSourceEvent::IncomingMessage(
crate::KhbbWsIncomingMessage::Response { raw, id, .. },
)) => {
let insert_ws_message_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
@@ -346,7 +348,9 @@ pub async fn run_listener_runtime(
}
}
}
Ok(crate::KhbbWsIncomingMessage::Notification { raw, method, .. }) => {
Ok(crate::KhbbWsSourceEvent::IncomingMessage(
crate::KhbbWsIncomingMessage::Notification { raw, method, .. },
)) => {
let insert_ws_message_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
@@ -1139,7 +1143,9 @@ pub async fn run_listener_runtime(
}
}
}
Ok(crate::KhbbWsIncomingMessage::Unknown { raw, .. }) => {
Ok(crate::KhbbWsSourceEvent::IncomingMessage(
crate::KhbbWsIncomingMessage::Unknown { raw, .. },
)) => {
let insert_ws_message_result = crate::storage::insert_raw_ws_message(
pool,
session.id,
@@ -1163,7 +1169,9 @@ pub async fn run_listener_runtime(
}
}
}
Ok(crate::KhbbWsIncomingMessage::StreamEnded) => {
Ok(crate::KhbbWsSourceEvent::IncomingMessage(
crate::KhbbWsIncomingMessage::StreamEnded,
)) => {
tracing::info!(
listener_session_id = session.id,
"websocket stream ended"
@@ -1177,7 +1185,6 @@ pub async fn run_listener_runtime(
error = %error,
"failed to read websocket message"
);
final_status = std::string::String::from("ws_read_error");
break;
}
@@ -1186,6 +1193,11 @@ pub async fn run_listener_runtime(
Err(_) => {}
}
if tick_count >= config.listener_max_ticks {
tracing::info!(
listener_session_id = session.id,
tick_count = tick_count,
"listener reached max ticks, stopping loop"
);
break;
}
}
@@ -1227,67 +1239,10 @@ pub async fn run_listener_runtime(
tracing::info!(
listener_session_id = session.id,
subscription_count = active_ws_subscriptions.len(),
"starting websocket unsubscribe phase"
"starting websocket close phase"
);
for subscription in &active_ws_subscriptions {
let unsubscribe_timeout_result = tokio::time::timeout(
std::time::Duration::from_millis(500),
ws_client.unsubscribe(
subscription.kind,
subscription.subscription_id,
1000u64.saturating_add(tick_count).saturating_add(subscription.request_id),
),
)
.await;
match unsubscribe_timeout_result {
Ok(unsubscribe_result) => match unsubscribe_result {
Ok(value) => {
tracing::info!(
listener_session_id = session.id,
unsubscribed = value,
subscription_id = subscription.subscription_id,
kind = ?subscription.kind,
label = ?subscription.label,
"websocket subscription cancelled"
);
},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
subscription_id = subscription.subscription_id,
kind = ?subscription.kind,
label = ?subscription.label,
"failed to cancel websocket subscription"
);
},
},
Err(_) => {
tracing::error!(
listener_session_id = session.id,
subscription_id = subscription.subscription_id,
kind = ?subscription.kind,
label = ?subscription.label,
"websocket unsubscribe timed out"
);
},
}
}
let ws_close_result = ws_client.close().await;
match ws_close_result {
Ok(()) => {
tracing::info!(listener_session_id = session.id, "websocket rpc client closed");
},
Err(error) => {
tracing::error!(
listener_session_id = session.id,
error = %error,
"failed to close websocket rpc client"
);
},
}
let session_candidates =
session_candidate_tracker.snapshot_sorted_by_score_desc();
let _ = tokio::time::timeout(std::time::Duration::from_secs(2), ws_source.close()).await;
let session_candidates = session_candidate_tracker.snapshot_sorted_by_score_desc();
tracing::info!(
listener_session_id = session.id,
candidate_count = session_candidates.len(),
@@ -1307,6 +1262,24 @@ pub async fn run_listener_runtime(
"session candidate summary entry"
);
}
for candidate in &session_candidates {
let insert_result =
crate::storage::insert_session_candidate(pool, session.id, candidate).await;
if let Err(error) = insert_result {
tracing::error!(
listener_session_id = session.id,
key = %candidate.key,
error = %error,
"failed to persist session candidate"
);
} else {
tracing::trace!(
listener_session_id = session.id,
key = %candidate.key,
"session candidate persisted"
);
}
}
let high_confidence_candidates = session_candidate_tracker
.snapshot_with_min_confidence(crate::KhbbCandidateConfidence::High);
tracing::info!(

View File

@@ -196,7 +196,6 @@ CREATE TABLE IF NOT EXISTS tracked_pools (
created_at TEXT NOT NULL
);
"#;
let create_tracked_pools_result = sqlx::query(tracked_pools_sql).execute(pool).await;
match create_tracked_pools_result {
Ok(_) => {},
@@ -207,6 +206,29 @@ CREATE TABLE IF NOT EXISTS tracked_pools (
});
},
}
let session_candidates_sql = r#"
CREATE TABLE IF NOT EXISTS session_candidates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
listener_session_id INTEGER NOT NULL,
candidate_key TEXT NOT NULL,
category TEXT NOT NULL,
pubkey TEXT NULL,
first_seen_slot INTEGER NOT NULL,
last_seen_slot INTEGER NOT NULL,
seen_count INTEGER NOT NULL,
score INTEGER NOT NULL,
confidence TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
FOREIGN KEY(listener_session_id) REFERENCES listener_sessions(id)
)
"#;
let session_candidates_query_result = sqlx::query(session_candidates_sql).execute(pool).await;
if let Err(error) = session_candidates_query_result {
return Err(crate::KhbbError::Database {
context: "create session_candidates table",
message: error.to_string(),
});
}
Ok(())
}
@@ -355,6 +377,91 @@ INSERT INTO raw_ws_messages (
}
}
pub(crate) async fn insert_session_candidate(
pool: &sqlx::SqlitePool,
listener_session_id: i64,
candidate: &crate::KhbbSessionCandidate,
) -> core::result::Result<(), crate::KhbbError> {
let first_seen_slot = i64::try_from(candidate.first_seen_slot);
let first_seen_slot_value = match first_seen_slot {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Runtime {
context: "convert first_seen_slot to sqlite integer",
message: error.to_string(),
});
},
};
let last_seen_slot = i64::try_from(candidate.last_seen_slot);
let last_seen_slot_value = match last_seen_slot {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Runtime {
context: "convert last_seen_slot to sqlite integer",
message: error.to_string(),
});
},
};
let seen_count = i64::try_from(candidate.seen_count);
let seen_count_value = match seen_count {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Runtime {
context: "convert seen_count to sqlite integer",
message: error.to_string(),
});
},
};
let score = i64::try_from(candidate.score);
let score_value = match score {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Runtime {
context: "convert score to sqlite integer",
message: error.to_string(),
});
},
};
let confidence_text = match candidate.confidence {
crate::KhbbCandidateConfidence::Low => "low",
crate::KhbbCandidateConfidence::Medium => "medium",
crate::KhbbCandidateConfidence::High => "high",
};
let insert_result = sqlx::query(
r#"
INSERT INTO session_candidates (
listener_session_id,
candidate_key,
category,
pubkey,
first_seen_slot,
last_seen_slot,
seen_count,
score,
confidence
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
"#,
)
.bind(listener_session_id)
.bind(&candidate.key)
.bind(&candidate.category)
.bind(candidate.pubkey.as_deref())
.bind(first_seen_slot_value)
.bind(last_seen_slot_value)
.bind(seen_count_value)
.bind(score_value)
.bind(confidence_text)
.execute(pool)
.await;
if let Err(error) = insert_result {
return Err(crate::KhbbError::Database {
context: "insert session candidate",
message: error.to_string(),
});
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
@@ -537,4 +644,76 @@ WHERE id = ?1;
.await;
assert!(insert_ws_result.is_ok());
}
#[tokio::test]
async fn ensure_sqlite_schema_creates_session_candidates_table() {
let pool_result = create_sqlite_pool("sqlite::memory:").await;
assert!(pool_result.is_ok());
let pool = pool_result.expect("sqlite memory pool");
let ensure_result = ensure_sqlite_schema(&pool).await;
assert!(ensure_result.is_ok());
let row_result = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='session_candidates'",
)
.fetch_one(&pool)
.await;
assert!(row_result.is_ok());
let count = row_result.expect("session_candidates table count");
assert_eq!(count, 1);
}
#[tokio::test]
async fn insert_session_candidate_inserts_row() {
let pool_result = create_sqlite_pool("sqlite::memory:").await;
assert!(pool_result.is_ok());
let pool = pool_result.expect("sqlite memory pool");
let ensure_result = ensure_sqlite_schema(&pool).await;
assert!(ensure_result.is_ok());
let database_url = build_temp_sqlite_url();
let config = build_test_config(database_url);
let session_result = insert_listener_session(&pool, &config).await;
assert!(session_result.is_ok());
let session = session_result.expect("listener session");
let candidate = crate::KhbbSessionCandidate {
key: std::string::String::from("token_account:SomePubkey"),
category: std::string::String::from("token_account"),
pubkey: Some(std::string::String::from("SomePubkey")),
first_seen_slot: 100,
last_seen_slot: 101,
seen_count: 2,
score: 50,
confidence: crate::KhbbCandidateConfidence::Medium,
};
let insert_result = insert_session_candidate(&pool, session.id, &candidate).await;
assert!(insert_result.is_ok());
let row_result =
sqlx::query_as::<_, (String, String, Option<String>, i64, i64, i64, i64, String)>(
r#"
SELECT
candidate_key,
category,
pubkey,
first_seen_slot,
last_seen_slot,
seen_count,
score,
confidence
FROM session_candidates
WHERE listener_session_id = ?1
"#,
)
.bind(session.id)
.fetch_one(&pool)
.await;
assert!(row_result.is_ok());
let row = row_result.expect("session candidate row");
assert_eq!(row.0, "token_account:SomePubkey");
assert_eq!(row.1, "token_account");
assert_eq!(row.2.as_deref(), Some("SomePubkey"));
assert_eq!(row.3, 100);
assert_eq!(row.4, 101);
assert_eq!(row.5, 2);
assert_eq!(row.6, 50);
assert_eq!(row.7, "medium");
}
}

216
khbb_lib/src/ws_source.rs Normal file
View File

@@ -0,0 +1,216 @@
// file: khbb_lib/src/ws_source.rs
//! Autonomous WebSocket source handle.
//!
//! This module currently provides a thin explicit wrapper around the existing
//! low-level Solana WebSocket JSON-RPC client. It preserves the listener-side
//! contract while keeping the implementation simple and deterministic:
//!
//! - no implicit reconnect
//! - no background retry logic
//! - explicit connect / read / close lifecycle
//! - reuse of the existing `KhbbSolanaWsRpcClient`
//!
//! The goal of this version is to restore a clean, coherent API boundary
//! between `listener.rs` and the WebSocket transport layer before introducing a
//! more advanced multi-source autonomous supervisor.
/// Event emitted by the WebSocket source handle.
#[derive(Debug, Clone)]
pub enum KhbbWsSourceEvent {
/// A classified incoming WebSocket message.
IncomingMessage(crate::KhbbWsIncomingMessage),
}
/// Handle used by the listener runtime to control a WebSocket source.
#[derive(Debug)]
pub struct KhbbWsSourceHandle {
/// Wrapped low-level Solana WebSocket client.
client: crate::KhbbSolanaWsRpcClient,
}
impl KhbbWsSourceHandle {
/// Creates a new source handle from an existing low-level client config.
pub fn spawn(
config: crate::KhbbSolanaWsRpcClientConfig,
) -> core::result::Result<Self, crate::KhbbError> {
let client_result = crate::KhbbSolanaWsRpcClient::new(config);
let client = match client_result {
Ok(value) => value,
Err(error) => {
return Err(error);
}
};
Ok(Self { client })
}
/// Connects the underlying WebSocket client.
pub async fn connect(&mut self) -> core::result::Result<(), crate::KhbbError> {
let connect_result = self.client.connect().await;
match connect_result {
Ok(()) => Ok(()),
Err(error) => Err(error),
}
}
/// Performs a `slotSubscribe` call.
pub async fn slot_subscribe(
&mut self,
id: u64,
) -> core::result::Result<crate::KhbbWsSubscribeCallOutput, crate::KhbbError> {
let subscribe_result = self.client.slot_subscribe(id).await;
match subscribe_result {
Ok(value) => Ok(value),
Err(error) => Err(error),
}
}
/// Performs a `logsSubscribe` call.
pub async fn logs_subscribe(
&mut self,
filter: solana_rpc_client_api::config::RpcTransactionLogsFilter,
config: core::option::Option<solana_rpc_client_api::config::RpcTransactionLogsConfig>,
id: u64,
) -> core::result::Result<crate::KhbbWsSubscribeCallOutput, crate::KhbbError> {
let subscribe_result = self.client.logs_subscribe(filter, config, id).await;
match subscribe_result {
Ok(value) => Ok(value),
Err(error) => Err(error),
}
}
/// Performs a `programSubscribe` call.
pub async fn program_subscribe(
&mut self,
program_id: &str,
config: core::option::Option<solana_rpc_client_api::config::RpcProgramAccountsConfig>,
id: u64,
) -> core::result::Result<crate::KhbbWsSubscribeCallOutput, crate::KhbbError> {
let subscribe_result = self.client.program_subscribe(program_id, config, id).await;
match subscribe_result {
Ok(value) => Ok(value),
Err(error) => Err(error),
}
}
/// Reads the next event emitted by the underlying client.
pub async fn recv_event(
&mut self,
) -> core::result::Result<KhbbWsSourceEvent, crate::KhbbError> {
let read_result = self.client.read_next_incoming_message().await;
match read_result {
Ok(message) => Ok(KhbbWsSourceEvent::IncomingMessage(message)),
Err(error) => Err(error),
}
}
/// Closes the underlying WebSocket client.
pub async fn close(&mut self) -> core::result::Result<(), crate::KhbbError> {
let close_result = self.client.close().await;
match close_result {
Ok(()) => Ok(()),
Err(error) => Err(error),
}
}
}
#[cfg(test)]
mod tests {
/// Verifies that `spawn` rejects an empty URL via the wrapped client config
/// validation.
#[test]
fn spawn_rejects_empty_url() {
let result = super::KhbbWsSourceHandle::spawn(crate::KhbbSolanaWsRpcClientConfig {
url: std::string::String::new(),
});
assert!(result.is_err());
}
/// Verifies that `recv_event` drains a preloaded pending response message.
#[tokio::test]
async fn recv_event_returns_wrapped_response_message() {
let client_result = crate::KhbbSolanaWsRpcClient::new(crate::KhbbSolanaWsRpcClientConfig {
url: std::string::String::from("wss://example.invalid"),
});
let mut client = match client_result {
Ok(value) => value,
Err(error) => {
panic!("unexpected client construction error: {error}");
}
};
client
.pending_incoming_messages
.push_back(crate::KhbbWsIncomingMessage::Response {
raw: std::string::String::from(r#"{"jsonrpc":"2.0","id":1,"result":42}"#),
id: 1,
json: serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"result": 42,
}),
});
let mut handle = super::KhbbWsSourceHandle { client };
let event_result = handle.recv_event().await;
match event_result {
Ok(super::KhbbWsSourceEvent::IncomingMessage(
crate::KhbbWsIncomingMessage::Response { id, .. },
)) => {
assert_eq!(id, 1);
}
Ok(_) => {
panic!("unexpected event variant");
}
Err(error) => {
panic!("unexpected recv_event error: {error}");
}
}
}
/// Verifies that `recv_event` returns a wrapped stream-ended message when a
/// pending stream-ended message is queued.
#[tokio::test]
async fn recv_event_returns_wrapped_stream_ended_message() {
let client_result = crate::KhbbSolanaWsRpcClient::new(crate::KhbbSolanaWsRpcClientConfig {
url: std::string::String::from("wss://example.invalid"),
});
let mut client = match client_result {
Ok(value) => value,
Err(error) => {
panic!("unexpected client construction error: {error}");
}
};
client
.pending_incoming_messages
.push_back(crate::KhbbWsIncomingMessage::StreamEnded);
let mut handle = super::KhbbWsSourceHandle { client };
let event_result = handle.recv_event().await;
match event_result {
Ok(super::KhbbWsSourceEvent::IncomingMessage(
crate::KhbbWsIncomingMessage::StreamEnded,
)) => {}
Ok(_) => {
panic!("unexpected event variant");
}
Err(error) => {
panic!("unexpected recv_event error: {error}");
}
}
}
/// Verifies that `close` is a no-op when the wrapped client is not
/// connected.
#[tokio::test]
async fn close_succeeds_when_client_is_not_connected() {
let handle_result = super::KhbbWsSourceHandle::spawn(crate::KhbbSolanaWsRpcClientConfig {
url: std::string::String::from("wss://example.invalid"),
});
let mut handle = match handle_result {
Ok(value) => value,
Err(error) => {
panic!("unexpected handle construction error: {error}");
}
};
let close_result = handle.close().await;
assert!(close_result.is_ok());
}
}