From 3964452af0c1363fb22d5807404e6c7c99a21fae Mon Sep 17 00:00:00 2001 From: SinuS Von SifriduS Date: Sun, 19 Apr 2026 17:58:33 +0200 Subject: [PATCH] 0.6.1 --- Cargo.toml | 8 +- TODO.md | 396 ++++++++++++++++++++++++++++++++++++++ khbb_lib/src/lib.rs | 5 + khbb_lib/src/listener.rs | 117 +++++------ khbb_lib/src/storage.rs | 181 ++++++++++++++++- khbb_lib/src/ws_source.rs | 216 +++++++++++++++++++++ 6 files changed, 849 insertions(+), 74 deletions(-) create mode 100644 TODO.md create mode 100644 khbb_lib/src/ws_source.rs diff --git a/Cargo.toml b/Cargo.toml index 16ae80a..7203e55 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.5.9" +version = "0.6.1" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot" @@ -19,6 +19,7 @@ publish = false async-trait = { version = "^0.1", features = [] } base64 = { version = "^0.22", features = [] } chrono = { version = "^0.4", features = ["serde"] } +fs2 = { version = "^0.4", features = [] } futures-util = { version = "^0.3", features = [] } reqwest = { version = "^0.13", default-features = false, features = ["charset", "cookies", "deflate", "form", "gzip", "http2", "json", "multipart", "query", "rustls", "socks", "stream", "zstd"] } rustls = { version = "^0.23", features = ["aws-lc-rs"] } @@ -38,11 +39,16 @@ spl-memo-interface = { version = "^2.0", features = [] } spl-token-interface = { version = "^2.0", features = [] } spl-token-2022-interface = { version = "^2.1", features = [] } sqlx = { version = "^0.8", features = ["chrono", "uuid", "bigdecimal", "json", "sqlite", "runtime-tokio-rustls"] } +tauri = { version = "^2.10", features = ["default"] } +tauri-build = { version = "2", features = [] } +tauri-plugin-tracing = { version = "^0.3", features = [] } tokio = { version = "^1.52", features = ["full"] } tokio-stream = { version = "^0.1", features = ["full"] } tokio-tungstenite = { version = "^0.29", default-features = false, features = ["connect", "handshake", "rustls-tls-webpki-roots", "stream", "url"] } tracing = { version = "^0.1", features = [] } +tracing-appender = { version = "^0.2", features = [] } tracing-subscriber = { version = "^0.3", features = ["ansi", "env-filter", "chrono", "serde", "json"] } +ts-rs = { version = "^12.0", features = [] } yellowstone-grpc-client = { version = "^13.0", features = [] } yellowstone-grpc-proto = { version = "^12.2", features = [] } uuid = { version = "^1.23", features = ["v4", "serde"] } diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..e33538f --- /dev/null +++ b/TODO.md @@ -0,0 +1,396 @@ + + +# Khadhroony Bobot — Global TODO / Roadmap + +> This roadmap is a living document. +> Completed versions reflect the current implemented state. +> Planned versions are indicative and may evolve as architecture and product needs become clearer. + +--- + +## Project goals + +The project is split into several major applications sharing `khbb_lib`: + +- `khbb_listener_app` + - listen to Solana data sources, + - detect token / mint / token-account / bootstrap / pair / pool related activity, + - enrich and store observed data. + +- `khbb_pattern_analyser_app` + - analyze recurring patterns on tokens, mints, pools, names, lifetimes, scam patterns, bootstrap flows, early activity. + +- `khbb_trader_app` + - consume early detections and pattern outputs, + - take automated trading decisions, + - apply entry/exit logic based on price, duration, confidence, and risk. + +--- + +## Current architectural direction + +Current implementation started with a simple inline listener loop in order to validate: + +- Solana HTTP RPC transport, +- Solana WebSocket transport, +- raw data storage, +- event normalization, +- early domain classification, +- heuristic signals, +- HTTP enrichment, +- candidate tracking. + +This is considered a bootstrap architecture. + +### Target architecture direction + +The target architecture should progressively move toward: + +- multiple concurrent data sources, +- autonomous source clients, +- async task-based source runtimes, +- command/event channels, +- source aggregation hub, +- separation between: + - transport, + - normalization, + - enrichment, + - correlation, + - persistence, + - strategy/trading. + +This especially applies to: + +- multiple WebSocket clients, +- Yellowstone gRPC client(s), +- HTTP enrichment workers, +- future external data sources. + +--- + +## Completed versions + +### v0.1.x +- initial project skeleton +- workspace setup +- strict coding conventions +- config loading +- tracing bootstrap +- SQLite bootstrap + +### v0.2.x +- basic listener runtime +- listener session creation +- SQLite connectivity validation +- initial runtime loop +- first persistence primitives + +### v0.3.0 +- HTTP JSON-RPC client foundation +- manual request ids +- use of official Solana RPC request/response types where applicable +- basic `getSlot` support +- raw HTTP RPC storage + +### v0.3.1 +- tests added around config and storage +- tests added around HTTP RPC request/response parsing +- stronger validation of config and SQLite helpers + +### v0.3.2 +- HTTP polling integrated into listener runtime +- `getSlot` polling stored as raw RPC traffic + +### v0.4.1 +- initial Solana WebSocket RPC support +- subscription primitives +- support for: + - `slotSubscribe` + - `logsSubscribe` + - `programSubscribe` +- raw WS message storage + +### v0.4.3 +- centralized WebSocket read loop improvements +- safe unsubscribe handling +- shutdown flow improvements +- Rustls provider initialization fixes + +### v0.4.4 +- centralized WS dispatch between: + - JSON-RPC responses + - notifications +- better separation of response wait logic and notification flow + +### v0.4.5 +- active WS subscription registry +- source metadata attached to subscriptions +- cleaner handling of multiple subscription kinds + +### v0.4.6 +- WS notification normalization layer +- normalized events introduced for: + - slot + - logs + - program notifications + +### v0.5.0 +- first domain event layer +- normalized WS events converted into domain events + +### v0.5.1 +- known program registry +- first known-program classification layer +- SPL Token / Token-2022 / System / ComputeBudget / ATA recognition + +### v0.5.2 +- official `ids.rs` module added +- heuristic signal layer introduced +- first weak signals around: + - token account activity + - mint activity + - initial token activity + - bootstrap-style activity + +### v0.5.3 +- heuristic refinement +- associated token account style signals +- stronger bootstrap-oriented heuristics from known logs + +### v0.5.4 +- targeted HTTP enrichment via `getAccountInfo` +- enriched account snapshot +- first distinction between: + - potential token account + - potential mint account + - unknown account + +### v0.5.5 +- enriched classification layer +- confirmed enriched events introduced: + - confirmed token account activity + - confirmed mint account activity + - unknown token-program-owned account activity + +### v0.5.6 +- local correlation layer between: + - enriched events + - heuristics +- correlated signals introduced: + - confirmed token account update + - potential new token mint + - potential token bootstrap flow + +### v0.5.7 +- candidate layer introduced +- first domain candidates: + - token account candidate + - mint candidate + - bootstrap flow candidate +- listener maximum tick count made configurable + +### v0.5.8 +- in-memory session candidate tracker +- session-local deduplication +- lightweight score and confidence +- per-session candidate upsert flow + +### v0.5.9 +- session candidate snapshots +- sorted candidate summaries +- confidence-based filtering +- end-of-session summary logs + +--- + +## Current status summary + +At the current stage, the project can already: + +- connect to Solana HTTP RPC, +- connect to Solana WebSocket RPC, +- subscribe to slot/logs/program streams, +- store raw HTTP and WS payloads, +- normalize notifications, +- derive domain events, +- classify known programs, +- emit heuristic signals, +- enrich accounts with `getAccountInfo`, +- confirm token-account-like activity, +- correlate signals, +- track session candidates in memory, +- summarize candidates at the end of a run. + +What is still missing is the transition from generic token activity detection to true: + +- mint detection, +- token bootstrap detection, +- pool/pair detection, +- DEX-aware activity detection, +- persistent candidate storage, +- pattern analysis, +- trading integration. + +--- + +## Planned versions + +> Planned versions are provisional and may change. + +### v0.6.0 +- persist session candidates to SQLite +- add dedicated candidate table(s) +- persist summary-worthy candidates at end of listener session +- create base persistence model reusable by analyser and trader + +### v0.6.1 +- refactor WebSocket client into an autonomous async source runtime +- introduce command/event channel model +- separate WS transport orchestration from listener business logic + +### v0.6.2 +- support multiple concurrent WS source clients +- allow different endpoints/providers simultaneously +- introduce source identity and source metadata +- prepare failover / redundancy / specialization by source + +### v0.6.3 +- introduce Yellowstone gRPC source runtime +- normalize WS + gRPC source events into a shared event model +- compare and reconcile overlapping data streams + +### v0.6.4 +- introduce source aggregation hub +- merge multi-source events into a unified ingestion pipeline +- prepare source-level prioritization and deduplication + +### v0.7.0 +- improve token-account vs mint distinction +- decode more token-specific account information +- improve quality of confirmed mint detection +- reduce weak false candidate generation + +### v0.7.1 +- introduce first persistent mint candidates +- introduce first persistent token-account candidates +- session-to-database promotion logic + +### v0.7.2 +- begin real bootstrap flow persistence and tracking +- correlate: + - token accounts + - mint accounts + - bootstrap-like logs + - source timing + +### v0.8.0 +- begin DEX-aware detection layer +- identify relevant program ids for first supported DEXes +- start with a small prioritized list, likely including: + - Raydium + - Meteora + - Pump-like ecosystems +- detect pool/pair creation-related activity + +### v0.8.1 +- add first pair / pool candidate models +- begin liquidity/bootstrap/pair-side inference +- distinguish token-only activity from pair/pool activity + +### v0.8.2 +- add market-related enrichment: + - liquidity hints + - price hints + - pool metadata +- start preparing listener outputs for trading eligibility + +### v0.9.0 +- begin `khbb_pattern_analyser_app` +- consume persisted candidates/signals from listener +- define first recurring pattern models: + - mint lifetime + - suspicious bootstrap + - likely scam patterns + - repeated naming patterns + - repeated creator behavior + +### v0.9.1 +- pattern scoring +- candidate clustering +- repeated-behavior analysis across sessions + +### v1.0.0 +- first end-to-end listener milestone +- persistent candidate ingestion +- multi-source ingestion foundations +- DEX-aware first detection +- pattern analyser initial interoperability + +--- + +## Future trader-oriented roadmap + +### Trader preparation +- define normalized outputs consumable by `khbb_trader_app` +- define candidate confidence / risk / eligibility model +- define basic token entry exclusion filters +- define liquidity / market-cap / bootstrap safety gates + +### Early trading versions +- paper-trading mode first +- wallet integration +- transaction builder integration +- buy/sell strategy primitives +- time-based and price-based exits +- risk caps and kill switches + +--- + +## Cross-cutting technical TODO + +### Transport / source runtime +- [ ] move away from single inline WS runtime model +- [ ] introduce autonomous async WS source tasks +- [ ] add multi-WS support +- [ ] add gRPC source runtime +- [ ] unify source event interface + +### Storage +- [ ] persist session candidates +- [ ] define candidate history tables +- [ ] define mint/pool/pair tables +- [ ] define source tables / provider metadata tables + +### Classification / heuristics +- [ ] reduce weak token-account/mint ambiguity +- [ ] improve ATA detection +- [ ] refine bootstrap-flow detection +- [ ] correlate logs with program/account enrichment more strongly + +### DEX support +- [ ] define first supported DEX list +- [ ] list official program ids and account models +- [ ] detect pair creation events +- [ ] detect liquidity initialization +- [ ] enrich pool metadata + +### Pattern analysis +- [ ] define persistent pattern schema +- [ ] score repeated token behaviors +- [ ] score suspicious creators / repeated rugs / short lifetimes + +### Trading +- [ ] define trader input schema +- [ ] define safe simulation mode +- [ ] define buy filters +- [ ] define sell rules +- [ ] define risk management rules + +--- + +## Notes + +- Current versions intentionally favored iterative validation over final architecture purity. +- The current listener runtime is considered a stepping stone, not the final source-runtime model. +- Candidate/correlation layers may continue to evolve before stabilizing. +- Future versions may be renumbered, merged, split, or reprioritized depending on what real Solana traffic reveals. diff --git a/khbb_lib/src/lib.rs b/khbb_lib/src/lib.rs index 581306a..b6b4857 100644 --- a/khbb_lib/src/lib.rs +++ b/khbb_lib/src/lib.rs @@ -30,6 +30,7 @@ mod signal_correlation; mod candidate; mod session_candidate; mod session_tracker; +mod ws_source; /// Runs the listener application bootstrap workflow. pub use crate::app::run_listener_app; @@ -155,3 +156,7 @@ pub use crate::session_candidate::KhbbSessionCandidate; pub use crate::session_tracker::KhbbSessionCandidateTracker; /// Result of inserting or updating a session candidate. pub use crate::session_tracker::KhbbSessionCandidateUpdate; +/// Event emitted by the autonomous WebSocket source. +pub use crate::ws_source::KhbbWsSourceEvent; +/// Handle used by the listener runtime to control a WebSocket source task. +pub use crate::ws_source::KhbbWsSourceHandle; diff --git a/khbb_lib/src/listener.rs b/khbb_lib/src/listener.rs index b538e86..dfbfeac 100644 --- a/khbb_lib/src/listener.rs +++ b/khbb_lib/src/listener.rs @@ -52,14 +52,14 @@ pub async fn run_listener_runtime( }; let ws_client_config = crate::KhbbSolanaWsRpcClientConfig { url: config.solana_ws_rpc_url.clone() }; - let ws_client_result = crate::KhbbSolanaWsRpcClient::new(ws_client_config); - let mut ws_client = match ws_client_result { + let ws_source_result = crate::KhbbWsSourceHandle::spawn(ws_client_config); + let mut ws_source = match ws_source_result { Ok(value) => value, Err(error) => { return Err(error); }, }; - let ws_connect_result = ws_client.connect().await; + let ws_connect_result = ws_source.connect().await; match ws_connect_result { Ok(()) => { tracing::info!( @@ -75,7 +75,7 @@ pub async fn run_listener_runtime( let mut active_ws_subscriptions = std::vec::Vec::::new(); if config.enable_ws_slot_subscribe { - let slot_subscribe_result = ws_client.slot_subscribe(1).await; + let slot_subscribe_result = ws_source.slot_subscribe(1).await; let slot_subscribe_output = match slot_subscribe_result { Ok(value) => value, Err(error) => { @@ -135,7 +135,7 @@ pub async fn run_listener_runtime( }); } if config.enable_ws_logs_subscribe { - let logs_subscribe_result = ws_client + let logs_subscribe_result = ws_source .logs_subscribe(solana_rpc_client_api::config::RpcTransactionLogsFilter::All, None, 2) .await; let logs_subscribe_output = match logs_subscribe_result { @@ -200,7 +200,7 @@ pub async fn run_listener_runtime( let mut program_request_id: u64 = 10; for program_id in &config.ws_program_subscribe_program_ids { let program_subscribe_result = - ws_client.program_subscribe(program_id, None, program_request_id).await; + ws_source.program_subscribe(program_id, None, program_request_id).await; let program_subscribe_output = match program_subscribe_result { Ok(value) => value, Err(error) => { @@ -315,13 +315,15 @@ pub async fn run_listener_runtime( } let ws_read_timeout_result = tokio::time::timeout( std::time::Duration::from_millis(50), - ws_client.read_next_incoming_message(), + ws_source.recv_event(), ) .await; match ws_read_timeout_result { Ok(read_result) => { match read_result { - Ok(crate::KhbbWsIncomingMessage::Response { raw, id, .. }) => { + Ok(crate::KhbbWsSourceEvent::IncomingMessage( + crate::KhbbWsIncomingMessage::Response { raw, id, .. }, + )) => { let insert_ws_message_result = crate::storage::insert_raw_ws_message( pool, session.id, @@ -346,7 +348,9 @@ pub async fn run_listener_runtime( } } } - Ok(crate::KhbbWsIncomingMessage::Notification { raw, method, .. }) => { + Ok(crate::KhbbWsSourceEvent::IncomingMessage( + crate::KhbbWsIncomingMessage::Notification { raw, method, .. }, + )) => { let insert_ws_message_result = crate::storage::insert_raw_ws_message( pool, session.id, @@ -1139,7 +1143,9 @@ pub async fn run_listener_runtime( } } } - Ok(crate::KhbbWsIncomingMessage::Unknown { raw, .. }) => { + Ok(crate::KhbbWsSourceEvent::IncomingMessage( + crate::KhbbWsIncomingMessage::Unknown { raw, .. }, + )) => { let insert_ws_message_result = crate::storage::insert_raw_ws_message( pool, session.id, @@ -1163,7 +1169,9 @@ pub async fn run_listener_runtime( } } } - Ok(crate::KhbbWsIncomingMessage::StreamEnded) => { + Ok(crate::KhbbWsSourceEvent::IncomingMessage( + crate::KhbbWsIncomingMessage::StreamEnded, + )) => { tracing::info!( listener_session_id = session.id, "websocket stream ended" @@ -1177,7 +1185,6 @@ pub async fn run_listener_runtime( error = %error, "failed to read websocket message" ); - final_status = std::string::String::from("ws_read_error"); break; } @@ -1186,6 +1193,11 @@ pub async fn run_listener_runtime( Err(_) => {} } if tick_count >= config.listener_max_ticks { + tracing::info!( + listener_session_id = session.id, + tick_count = tick_count, + "listener reached max ticks, stopping loop" + ); break; } } @@ -1227,67 +1239,10 @@ pub async fn run_listener_runtime( tracing::info!( listener_session_id = session.id, subscription_count = active_ws_subscriptions.len(), - "starting websocket unsubscribe phase" + "starting websocket close phase" ); - for subscription in &active_ws_subscriptions { - let unsubscribe_timeout_result = tokio::time::timeout( - std::time::Duration::from_millis(500), - ws_client.unsubscribe( - subscription.kind, - subscription.subscription_id, - 1000u64.saturating_add(tick_count).saturating_add(subscription.request_id), - ), - ) - .await; - match unsubscribe_timeout_result { - Ok(unsubscribe_result) => match unsubscribe_result { - Ok(value) => { - tracing::info!( - listener_session_id = session.id, - unsubscribed = value, - subscription_id = subscription.subscription_id, - kind = ?subscription.kind, - label = ?subscription.label, - "websocket subscription cancelled" - ); - }, - Err(error) => { - tracing::error!( - listener_session_id = session.id, - error = %error, - subscription_id = subscription.subscription_id, - kind = ?subscription.kind, - label = ?subscription.label, - "failed to cancel websocket subscription" - ); - }, - }, - Err(_) => { - tracing::error!( - listener_session_id = session.id, - subscription_id = subscription.subscription_id, - kind = ?subscription.kind, - label = ?subscription.label, - "websocket unsubscribe timed out" - ); - }, - } - } - let ws_close_result = ws_client.close().await; - match ws_close_result { - Ok(()) => { - tracing::info!(listener_session_id = session.id, "websocket rpc client closed"); - }, - Err(error) => { - tracing::error!( - listener_session_id = session.id, - error = %error, - "failed to close websocket rpc client" - ); - }, - } - let session_candidates = - session_candidate_tracker.snapshot_sorted_by_score_desc(); + let _ = tokio::time::timeout(std::time::Duration::from_secs(2), ws_source.close()).await; + let session_candidates = session_candidate_tracker.snapshot_sorted_by_score_desc(); tracing::info!( listener_session_id = session.id, candidate_count = session_candidates.len(), @@ -1307,6 +1262,24 @@ pub async fn run_listener_runtime( "session candidate summary entry" ); } + for candidate in &session_candidates { + let insert_result = + crate::storage::insert_session_candidate(pool, session.id, candidate).await; + if let Err(error) = insert_result { + tracing::error!( + listener_session_id = session.id, + key = %candidate.key, + error = %error, + "failed to persist session candidate" + ); + } else { + tracing::trace!( + listener_session_id = session.id, + key = %candidate.key, + "session candidate persisted" + ); + } + } let high_confidence_candidates = session_candidate_tracker .snapshot_with_min_confidence(crate::KhbbCandidateConfidence::High); tracing::info!( diff --git a/khbb_lib/src/storage.rs b/khbb_lib/src/storage.rs index f8a5dd9..ec91c83 100644 --- a/khbb_lib/src/storage.rs +++ b/khbb_lib/src/storage.rs @@ -196,7 +196,6 @@ CREATE TABLE IF NOT EXISTS tracked_pools ( created_at TEXT NOT NULL ); "#; - let create_tracked_pools_result = sqlx::query(tracked_pools_sql).execute(pool).await; match create_tracked_pools_result { Ok(_) => {}, @@ -207,6 +206,29 @@ CREATE TABLE IF NOT EXISTS tracked_pools ( }); }, } + let session_candidates_sql = r#" +CREATE TABLE IF NOT EXISTS session_candidates ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + listener_session_id INTEGER NOT NULL, + candidate_key TEXT NOT NULL, + category TEXT NOT NULL, + pubkey TEXT NULL, + first_seen_slot INTEGER NOT NULL, + last_seen_slot INTEGER NOT NULL, + seen_count INTEGER NOT NULL, + score INTEGER NOT NULL, + confidence TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')), + FOREIGN KEY(listener_session_id) REFERENCES listener_sessions(id) +) + "#; + let session_candidates_query_result = sqlx::query(session_candidates_sql).execute(pool).await; + if let Err(error) = session_candidates_query_result { + return Err(crate::KhbbError::Database { + context: "create session_candidates table", + message: error.to_string(), + }); + } Ok(()) } @@ -355,6 +377,91 @@ INSERT INTO raw_ws_messages ( } } +pub(crate) async fn insert_session_candidate( + pool: &sqlx::SqlitePool, + listener_session_id: i64, + candidate: &crate::KhbbSessionCandidate, +) -> core::result::Result<(), crate::KhbbError> { + let first_seen_slot = i64::try_from(candidate.first_seen_slot); + let first_seen_slot_value = match first_seen_slot { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Runtime { + context: "convert first_seen_slot to sqlite integer", + message: error.to_string(), + }); + }, + }; + let last_seen_slot = i64::try_from(candidate.last_seen_slot); + let last_seen_slot_value = match last_seen_slot { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Runtime { + context: "convert last_seen_slot to sqlite integer", + message: error.to_string(), + }); + }, + }; + let seen_count = i64::try_from(candidate.seen_count); + let seen_count_value = match seen_count { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Runtime { + context: "convert seen_count to sqlite integer", + message: error.to_string(), + }); + }, + }; + let score = i64::try_from(candidate.score); + let score_value = match score { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Runtime { + context: "convert score to sqlite integer", + message: error.to_string(), + }); + }, + }; + let confidence_text = match candidate.confidence { + crate::KhbbCandidateConfidence::Low => "low", + crate::KhbbCandidateConfidence::Medium => "medium", + crate::KhbbCandidateConfidence::High => "high", + }; + let insert_result = sqlx::query( + r#" +INSERT INTO session_candidates ( + listener_session_id, + candidate_key, + category, + pubkey, + first_seen_slot, + last_seen_slot, + seen_count, + score, + confidence +) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9) + "#, + ) + .bind(listener_session_id) + .bind(&candidate.key) + .bind(&candidate.category) + .bind(candidate.pubkey.as_deref()) + .bind(first_seen_slot_value) + .bind(last_seen_slot_value) + .bind(seen_count_value) + .bind(score_value) + .bind(confidence_text) + .execute(pool) + .await; + if let Err(error) = insert_result { + return Err(crate::KhbbError::Database { + context: "insert session candidate", + message: error.to_string(), + }); + } + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -537,4 +644,76 @@ WHERE id = ?1; .await; assert!(insert_ws_result.is_ok()); } + + #[tokio::test] + async fn ensure_sqlite_schema_creates_session_candidates_table() { + let pool_result = create_sqlite_pool("sqlite::memory:").await; + assert!(pool_result.is_ok()); + let pool = pool_result.expect("sqlite memory pool"); + let ensure_result = ensure_sqlite_schema(&pool).await; + assert!(ensure_result.is_ok()); + let row_result = sqlx::query_scalar::<_, i64>( + "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='session_candidates'", + ) + .fetch_one(&pool) + .await; + assert!(row_result.is_ok()); + let count = row_result.expect("session_candidates table count"); + assert_eq!(count, 1); + } + + #[tokio::test] + async fn insert_session_candidate_inserts_row() { + let pool_result = create_sqlite_pool("sqlite::memory:").await; + assert!(pool_result.is_ok()); + let pool = pool_result.expect("sqlite memory pool"); + let ensure_result = ensure_sqlite_schema(&pool).await; + assert!(ensure_result.is_ok()); + let database_url = build_temp_sqlite_url(); + let config = build_test_config(database_url); + let session_result = insert_listener_session(&pool, &config).await; + assert!(session_result.is_ok()); + let session = session_result.expect("listener session"); + let candidate = crate::KhbbSessionCandidate { + key: std::string::String::from("token_account:SomePubkey"), + category: std::string::String::from("token_account"), + pubkey: Some(std::string::String::from("SomePubkey")), + first_seen_slot: 100, + last_seen_slot: 101, + seen_count: 2, + score: 50, + confidence: crate::KhbbCandidateConfidence::Medium, + }; + let insert_result = insert_session_candidate(&pool, session.id, &candidate).await; + assert!(insert_result.is_ok()); + let row_result = + sqlx::query_as::<_, (String, String, Option, i64, i64, i64, i64, String)>( + r#" +SELECT + candidate_key, + category, + pubkey, + first_seen_slot, + last_seen_slot, + seen_count, + score, + confidence +FROM session_candidates +WHERE listener_session_id = ?1 + "#, + ) + .bind(session.id) + .fetch_one(&pool) + .await; + assert!(row_result.is_ok()); + let row = row_result.expect("session candidate row"); + assert_eq!(row.0, "token_account:SomePubkey"); + assert_eq!(row.1, "token_account"); + assert_eq!(row.2.as_deref(), Some("SomePubkey")); + assert_eq!(row.3, 100); + assert_eq!(row.4, 101); + assert_eq!(row.5, 2); + assert_eq!(row.6, 50); + assert_eq!(row.7, "medium"); + } } diff --git a/khbb_lib/src/ws_source.rs b/khbb_lib/src/ws_source.rs new file mode 100644 index 0000000..3a6896c --- /dev/null +++ b/khbb_lib/src/ws_source.rs @@ -0,0 +1,216 @@ +// file: khbb_lib/src/ws_source.rs + +//! Autonomous WebSocket source handle. +//! +//! This module currently provides a thin explicit wrapper around the existing +//! low-level Solana WebSocket JSON-RPC client. It preserves the listener-side +//! contract while keeping the implementation simple and deterministic: +//! +//! - no implicit reconnect +//! - no background retry logic +//! - explicit connect / read / close lifecycle +//! - reuse of the existing `KhbbSolanaWsRpcClient` +//! +//! The goal of this version is to restore a clean, coherent API boundary +//! between `listener.rs` and the WebSocket transport layer before introducing a +//! more advanced multi-source autonomous supervisor. + +/// Event emitted by the WebSocket source handle. +#[derive(Debug, Clone)] +pub enum KhbbWsSourceEvent { + /// A classified incoming WebSocket message. + IncomingMessage(crate::KhbbWsIncomingMessage), +} + +/// Handle used by the listener runtime to control a WebSocket source. +#[derive(Debug)] +pub struct KhbbWsSourceHandle { + /// Wrapped low-level Solana WebSocket client. + client: crate::KhbbSolanaWsRpcClient, +} + +impl KhbbWsSourceHandle { + /// Creates a new source handle from an existing low-level client config. + pub fn spawn( + config: crate::KhbbSolanaWsRpcClientConfig, + ) -> core::result::Result { + let client_result = crate::KhbbSolanaWsRpcClient::new(config); + let client = match client_result { + Ok(value) => value, + Err(error) => { + return Err(error); + } + }; + Ok(Self { client }) + } + + /// Connects the underlying WebSocket client. + pub async fn connect(&mut self) -> core::result::Result<(), crate::KhbbError> { + let connect_result = self.client.connect().await; + match connect_result { + Ok(()) => Ok(()), + Err(error) => Err(error), + } + } + + /// Performs a `slotSubscribe` call. + pub async fn slot_subscribe( + &mut self, + id: u64, + ) -> core::result::Result { + let subscribe_result = self.client.slot_subscribe(id).await; + match subscribe_result { + Ok(value) => Ok(value), + Err(error) => Err(error), + } + } + + /// Performs a `logsSubscribe` call. + pub async fn logs_subscribe( + &mut self, + filter: solana_rpc_client_api::config::RpcTransactionLogsFilter, + config: core::option::Option, + id: u64, + ) -> core::result::Result { + let subscribe_result = self.client.logs_subscribe(filter, config, id).await; + match subscribe_result { + Ok(value) => Ok(value), + Err(error) => Err(error), + } + } + + /// Performs a `programSubscribe` call. + pub async fn program_subscribe( + &mut self, + program_id: &str, + config: core::option::Option, + id: u64, + ) -> core::result::Result { + let subscribe_result = self.client.program_subscribe(program_id, config, id).await; + match subscribe_result { + Ok(value) => Ok(value), + Err(error) => Err(error), + } + } + + /// Reads the next event emitted by the underlying client. + pub async fn recv_event( + &mut self, + ) -> core::result::Result { + let read_result = self.client.read_next_incoming_message().await; + match read_result { + Ok(message) => Ok(KhbbWsSourceEvent::IncomingMessage(message)), + Err(error) => Err(error), + } + } + + /// Closes the underlying WebSocket client. + pub async fn close(&mut self) -> core::result::Result<(), crate::KhbbError> { + let close_result = self.client.close().await; + match close_result { + Ok(()) => Ok(()), + Err(error) => Err(error), + } + } +} + +#[cfg(test)] +mod tests { + /// Verifies that `spawn` rejects an empty URL via the wrapped client config + /// validation. + #[test] + fn spawn_rejects_empty_url() { + let result = super::KhbbWsSourceHandle::spawn(crate::KhbbSolanaWsRpcClientConfig { + url: std::string::String::new(), + }); + assert!(result.is_err()); + } + + /// Verifies that `recv_event` drains a preloaded pending response message. + #[tokio::test] + async fn recv_event_returns_wrapped_response_message() { + let client_result = crate::KhbbSolanaWsRpcClient::new(crate::KhbbSolanaWsRpcClientConfig { + url: std::string::String::from("wss://example.invalid"), + }); + let mut client = match client_result { + Ok(value) => value, + Err(error) => { + panic!("unexpected client construction error: {error}"); + } + }; + client + .pending_incoming_messages + .push_back(crate::KhbbWsIncomingMessage::Response { + raw: std::string::String::from(r#"{"jsonrpc":"2.0","id":1,"result":42}"#), + id: 1, + json: serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": 42, + }), + }); + let mut handle = super::KhbbWsSourceHandle { client }; + let event_result = handle.recv_event().await; + match event_result { + Ok(super::KhbbWsSourceEvent::IncomingMessage( + crate::KhbbWsIncomingMessage::Response { id, .. }, + )) => { + assert_eq!(id, 1); + } + Ok(_) => { + panic!("unexpected event variant"); + } + Err(error) => { + panic!("unexpected recv_event error: {error}"); + } + } + } + + /// Verifies that `recv_event` returns a wrapped stream-ended message when a + /// pending stream-ended message is queued. + #[tokio::test] + async fn recv_event_returns_wrapped_stream_ended_message() { + let client_result = crate::KhbbSolanaWsRpcClient::new(crate::KhbbSolanaWsRpcClientConfig { + url: std::string::String::from("wss://example.invalid"), + }); + let mut client = match client_result { + Ok(value) => value, + Err(error) => { + panic!("unexpected client construction error: {error}"); + } + }; + client + .pending_incoming_messages + .push_back(crate::KhbbWsIncomingMessage::StreamEnded); + let mut handle = super::KhbbWsSourceHandle { client }; + let event_result = handle.recv_event().await; + match event_result { + Ok(super::KhbbWsSourceEvent::IncomingMessage( + crate::KhbbWsIncomingMessage::StreamEnded, + )) => {} + Ok(_) => { + panic!("unexpected event variant"); + } + Err(error) => { + panic!("unexpected recv_event error: {error}"); + } + } + } + + /// Verifies that `close` is a no-op when the wrapped client is not + /// connected. + #[tokio::test] + async fn close_succeeds_when_client_is_not_connected() { + let handle_result = super::KhbbWsSourceHandle::spawn(crate::KhbbSolanaWsRpcClientConfig { + url: std::string::String::from("wss://example.invalid"), + }); + let mut handle = match handle_result { + Ok(value) => value, + Err(error) => { + panic!("unexpected handle construction error: {error}"); + } + }; + let close_result = handle.close().await; + assert!(close_result.is_ok()); + } +}