diff --git a/Cargo.toml b/Cargo.toml index ae93392..d6f8700 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.1.0" +version = "0.2.0" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot" diff --git a/khbb_lib/src/app.rs b/khbb_lib/src/app.rs index 1818b79..609aa4d 100644 --- a/khbb_lib/src/app.rs +++ b/khbb_lib/src/app.rs @@ -1,12 +1,8 @@ // file: khbb_lib/src/app.rs +//! Application bootstrap for khbb binaries. + /// Runs the initial listener application workflow. -/// -/// This first version only: -/// - loads configuration -/// - opens the SQLite connection pool -/// - verifies connectivity -/// - keeps the runtime alive as the future integration point for listener tasks pub async fn run_listener_app(config_path: &str) -> core::result::Result<(), crate::KhbbError> { let config_result = crate::KhbbAppConfig::load_from_json_file(config_path).await; let config = match config_result { @@ -27,21 +23,28 @@ pub async fn run_listener_app(config_path: &str) -> core::result::Result<(), cra solana_http_rpc_url = %config.solana_http_rpc_url, solana_ws_rpc_url = %config.solana_ws_rpc_url, yellowstone_grpc_url = ?config.yellowstone_grpc_url, + bootstrap_database = config.bootstrap_database, + listener_poll_interval_ms = config.listener_poll_interval_ms, "khbb listener app starting" ); - let connect_result = sqlx::sqlite::SqlitePoolOptions::new() - .max_connections(1) - .connect(&config.database_url) - .await; - let pool = match connect_result { + let pool_result = crate::create_sqlite_pool(&config.database_url).await; + let pool = match pool_result { Ok(value) => value, Err(error) => { - return Err(crate::KhbbError::Database { - context: "connect sqlite pool", - message: error.to_string(), - }); + return Err(error); }, }; + if config.bootstrap_database { + let schema_result = crate::ensure_sqlite_schema(&pool).await; + match schema_result { + Ok(()) => { + tracing::info!("sqlite schema bootstrap succeeded"); + }, + Err(error) => { + return Err(error); + }, + } + } let ping_result = sqlx::query("SELECT 1;").execute(&pool).await; match ping_result { Ok(_) => { @@ -54,6 +57,9 @@ pub async fn run_listener_app(config_path: &str) -> core::result::Result<(), cra }); }, } - tracing::info!("listener tasks are not wired yet"); - Ok(()) + let listener_result = crate::run_listener_runtime(&pool, &config).await; + match listener_result { + Ok(()) => Ok(()), + Err(error) => Err(error), + } } diff --git a/khbb_lib/src/config.rs b/khbb_lib/src/config.rs index 8f22822..335ed27 100644 --- a/khbb_lib/src/config.rs +++ b/khbb_lib/src/config.rs @@ -1,5 +1,7 @@ // file: khbb_lib/src/config.rs +//! Configuration loading and validation for khbb applications. + /// Root application configuration used by the initial listener stack. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct KhbbAppConfig { @@ -13,6 +15,10 @@ pub struct KhbbAppConfig { pub yellowstone_grpc_url: std::option::Option, /// Tracing filter string. pub log_filter: std::string::String, + /// Enables or disables database schema bootstrap at startup. + pub bootstrap_database: bool, + /// Polling interval used by the current runtime skeleton. + pub listener_poll_interval_ms: u64, } impl KhbbAppConfig { @@ -44,6 +50,7 @@ impl KhbbAppConfig { Err(error) => Err(error), } } + /// Validates the application configuration. pub fn validate(&self) -> core::result::Result<(), crate::KhbbError> { if self.database_url.trim().is_empty() { @@ -66,6 +73,13 @@ impl KhbbAppConfig { message: std::string::String::from("log_filter must not be empty"), }); } + if self.listener_poll_interval_ms == 0 { + return Err(crate::KhbbError::Config { + message: std::string::String::from( + "listener_poll_interval_ms must be greater than 0", + ), + }); + } Ok(()) } } diff --git a/khbb_lib/src/domain.rs b/khbb_lib/src/domain.rs new file mode 100644 index 0000000..6097273 --- /dev/null +++ b/khbb_lib/src/domain.rs @@ -0,0 +1,37 @@ +// file: khbb_lib/src/domain.rs + +//! Domain types for the listener and storage layers. + +/// Runtime information recorded for a listener session. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KhbbListenerSession { + /// Database identifier. + pub id: i64, + /// UTC creation timestamp in RFC3339 format. + pub started_at: std::string::String, + /// Current runtime status. + pub status: std::string::String, + /// HTTP RPC endpoint used by the listener. + pub solana_http_rpc_url: std::string::String, + /// WebSocket RPC endpoint used by the listener. + pub solana_ws_rpc_url: std::string::String, + /// Optional Yellowstone gRPC endpoint used by the listener. + pub yellowstone_grpc_url: std::option::Option, +} + +/// Minimal tracked token projection stored by the listener. +/// +/// This is only a first storage-oriented skeleton and will be extended later. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub(crate) struct KhbbTrackedToken { + /// Database identifier. + pub id: i64, + /// Mint address in base58 form. + pub mint_address: std::string::String, + /// Optional token symbol if known. + pub symbol: std::option::Option, + /// Optional token name if known. + pub name: std::option::Option, + /// UTC creation timestamp in RFC3339 format. + pub created_at: std::string::String, +} diff --git a/khbb_lib/src/error.rs b/khbb_lib/src/error.rs index f9d4540..340d6b0 100644 --- a/khbb_lib/src/error.rs +++ b/khbb_lib/src/error.rs @@ -1,5 +1,7 @@ // file: khbb_lib/src/error.rs +//! Error definitions for the khbb library. + /// Main error type used across the khbb workspace. /// /// This project intentionally uses a single explicit error enum instead of diff --git a/khbb_lib/src/lib.rs b/khbb_lib/src/lib.rs index 456380e..a6ad36b 100644 --- a/khbb_lib/src/lib.rs +++ b/khbb_lib/src/lib.rs @@ -2,22 +2,34 @@ //! Core public library for the `khadhroony-bobot` workspace. //! -//! This crate exposes the reusable building blocks shared by the khbb -//! applications, starting with the listener runtime bootstrap. +//! This crate exposes the reusable components shared by the khbb binaries, +//! starting with configuration loading, tracing initialization, SQLite storage +//! bootstrap and the initial listener runtime skeleton. #![deny(unreachable_pub)] #![warn(missing_docs)] mod app; mod config; +mod domain; mod error; +mod listener; +mod storage; mod tracing_setup; -/// Public re-exports for the khbb core library. +/// Runs the listener application bootstrap workflow. pub use crate::app::run_listener_app; -/// Public re-exports for configuration loading. +/// Root configuration of the khbb applications. pub use crate::config::KhbbAppConfig; -/// Public re-exports for configuration loading errors and runtime errors. +/// Database session information for a listener runtime instance. +pub use crate::domain::KhbbListenerSession; +/// Main explicit error type used by the library. pub use crate::error::KhbbError; -/// Public re-exports for tracing initialization. +/// Runs the current listener runtime skeleton. +pub use crate::listener::run_listener_runtime; +/// Creates and initializes the SQLite storage layer. +pub use crate::storage::create_sqlite_pool; +/// Ensures the SQLite schema is present. +pub use crate::storage::ensure_sqlite_schema; +/// Initializes the tracing subscriber for the process. pub use crate::tracing_setup::init_tracing; diff --git a/khbb_lib/src/listener.rs b/khbb_lib/src/listener.rs new file mode 100644 index 0000000..2df6c22 --- /dev/null +++ b/khbb_lib/src/listener.rs @@ -0,0 +1,88 @@ +// file: khbb_lib/src/listener.rs +//! Listener runtime skeleton. +//! +//! This module does not yet connect to Solana RPC, WebSocket or gRPC streams. +//! It prepares the runtime structure, persistence hooks and shutdown behavior. + +/// Runs the current listener runtime skeleton. +pub async fn run_listener_runtime( + pool: &sqlx::SqlitePool, + config: &crate::KhbbAppConfig, +) -> core::result::Result<(), crate::KhbbError> { + let session_result = crate::storage::insert_listener_session(pool, config).await; + let session = match session_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; + tracing::info!( + listener_session_id = session.id, + solana_http_rpc_url = %session.solana_http_rpc_url, + solana_ws_rpc_url = %session.solana_ws_rpc_url, + yellowstone_grpc_url = ?session.yellowstone_grpc_url, + "listener runtime initialized" + ); + let tick_duration = std::time::Duration::from_millis(config.listener_poll_interval_ms); + let mut interval = tokio::time::interval(tick_duration); + let mut tick_count: u64 = 0; + loop { + tokio::select! { + _ = interval.tick() => { + tick_count = tick_count.saturating_add(1); + + tracing::trace!( + listener_session_id = session.id, + tick_count = tick_count, + "listener runtime heartbeat" + ); + if tick_count >= 3 { + break; + } + } + ctrl_c_result = tokio::signal::ctrl_c() => { + match ctrl_c_result { + Ok(()) => { + tracing::info!( + listener_session_id = session.id, + "ctrl-c received, shutting down listener runtime" + ); + break; + } + Err(error) => { + let status_update_result = crate::storage::update_listener_session_status( + pool, + session.id, + "signal_error", + ).await; + match status_update_result { + Ok(()) => {} + Err(update_error) => { + tracing::error!( + listener_session_id = session.id, + update_error = %update_error, + "failed to update listener session status after signal error" + ); + } + } + return Err(crate::KhbbError::Runtime { + context: "wait for ctrl-c", + message: error.to_string(), + }); + } + } + } + } + } + let status_update_result = + crate::storage::update_listener_session_status(pool, session.id, "stopped").await; + match status_update_result { + Ok(()) => { + tracing::info!(listener_session_id = session.id, "listener runtime stopped"); + }, + Err(error) => { + return Err(error); + }, + } + Ok(()) +} diff --git a/khbb_lib/src/storage.rs b/khbb_lib/src/storage.rs new file mode 100644 index 0000000..47b82b1 --- /dev/null +++ b/khbb_lib/src/storage.rs @@ -0,0 +1,268 @@ +// file: khbb_lib/src/storage.rs + +//! SQLite storage bootstrap and persistence helpers. + +/// Creates a SQLite pool for the khbb runtime. +pub async fn create_sqlite_pool( + database_url: &str, +) -> core::result::Result { + let sqlite_path = if let Some(value) = database_url.strip_prefix("sqlite://") { + value + } else if let Some(value) = database_url.strip_prefix("sqlite:") { + value + } else { + "" + }; + if !sqlite_path.is_empty() { + let path = std::path::Path::new(sqlite_path); + if let Some(parent) = path.parent() { + if !parent.as_os_str().is_empty() { + let create_dir_result = tokio::fs::create_dir_all(parent).await; + match create_dir_result { + Ok(()) => {} + Err(error) => { + return Err(crate::KhbbError::Io { + context: "create sqlite parent directory", + message: error.to_string(), + }); + } + } + } + } + } + let parse_result = ::from_str( + database_url, + ); + let connect_options = match parse_result { + Ok(value) => value.create_if_missing(true), + Err(error) => { + return Err(crate::KhbbError::Config { + message: std::format!( + "invalid sqlite database url `{database_url}`: {}", + error + ), + }); + } + }; + let connect_result = sqlx::sqlite::SqlitePoolOptions::new() + .max_connections(1) + .connect_with(connect_options) + .await; + match connect_result { + Ok(pool) => Ok(pool), + Err(error) => Err(crate::KhbbError::Database { + context: "connect sqlite pool", + message: error.to_string(), + }), + } +} + +/// Ensures that the initial SQLite schema exists. +pub async fn ensure_sqlite_schema( + pool: &sqlx::SqlitePool, +) -> core::result::Result<(), crate::KhbbError> { + let pragma_foreign_keys_result = sqlx::query("PRAGMA foreign_keys = ON;").execute(pool).await; + match pragma_foreign_keys_result { + Ok(_) => {}, + Err(error) => { + return Err(crate::KhbbError::Database { + context: "enable sqlite foreign keys", + message: error.to_string(), + }); + }, + } + let listener_sessions_sql = r#" +CREATE TABLE IF NOT EXISTS listener_sessions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + started_at TEXT NOT NULL, + status TEXT NOT NULL, + solana_http_rpc_url TEXT NOT NULL, + solana_ws_rpc_url TEXT NOT NULL, + yellowstone_grpc_url TEXT NULL +); +"#; + let create_listener_sessions_result = sqlx::query(listener_sessions_sql).execute(pool).await; + match create_listener_sessions_result { + Ok(_) => {}, + Err(error) => { + return Err(crate::KhbbError::Database { + context: "create listener_sessions table", + message: error.to_string(), + }); + }, + } + let raw_http_rpc_messages_sql = r#" +CREATE TABLE IF NOT EXISTS raw_http_rpc_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + listener_session_id INTEGER NOT NULL, + method TEXT NOT NULL, + request_body TEXT NOT NULL, + response_body TEXT NOT NULL, + created_at TEXT NOT NULL, + FOREIGN KEY(listener_session_id) REFERENCES listener_sessions(id) +); +"#; + let create_raw_http_rpc_messages_result = + sqlx::query(raw_http_rpc_messages_sql).execute(pool).await; + match create_raw_http_rpc_messages_result { + Ok(_) => {}, + Err(error) => { + return Err(crate::KhbbError::Database { + context: "create raw_http_rpc_messages table", + message: error.to_string(), + }); + }, + } + let raw_ws_messages_sql = r#" +CREATE TABLE IF NOT EXISTS raw_ws_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + listener_session_id INTEGER NOT NULL, + direction TEXT NOT NULL, + message_text TEXT NOT NULL, + created_at TEXT NOT NULL, + FOREIGN KEY(listener_session_id) REFERENCES listener_sessions(id) +); +"#; + let create_raw_ws_messages_result = sqlx::query(raw_ws_messages_sql).execute(pool).await; + match create_raw_ws_messages_result { + Ok(_) => {}, + Err(error) => { + return Err(crate::KhbbError::Database { + context: "create raw_ws_messages table", + message: error.to_string(), + }); + }, + } + let raw_grpc_messages_sql = r#" +CREATE TABLE IF NOT EXISTS raw_grpc_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + listener_session_id INTEGER NOT NULL, + stream_name TEXT NOT NULL, + message_json TEXT NOT NULL, + created_at TEXT NOT NULL, + FOREIGN KEY(listener_session_id) REFERENCES listener_sessions(id) +); +"#; + let create_raw_grpc_messages_result = sqlx::query(raw_grpc_messages_sql).execute(pool).await; + match create_raw_grpc_messages_result { + Ok(_) => {}, + Err(error) => { + return Err(crate::KhbbError::Database { + context: "create raw_grpc_messages table", + message: error.to_string(), + }); + }, + } + let tracked_tokens_sql = r#" +CREATE TABLE IF NOT EXISTS tracked_tokens ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + mint_address TEXT NOT NULL UNIQUE, + symbol TEXT NULL, + name TEXT NULL, + created_at TEXT NOT NULL +); +"#; + let create_tracked_tokens_result = sqlx::query(tracked_tokens_sql).execute(pool).await; + match create_tracked_tokens_result { + Ok(_) => {}, + Err(error) => { + return Err(crate::KhbbError::Database { + context: "create tracked_tokens table", + message: error.to_string(), + }); + }, + } + let tracked_pools_sql = r#" +CREATE TABLE IF NOT EXISTS tracked_pools ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + dex_name TEXT NOT NULL, + pool_address TEXT NOT NULL UNIQUE, + token_a_mint TEXT NOT NULL, + token_b_mint TEXT NOT NULL, + created_at TEXT NOT NULL +); +"#; + + let create_tracked_pools_result = sqlx::query(tracked_pools_sql).execute(pool).await; + match create_tracked_pools_result { + Ok(_) => {}, + Err(error) => { + return Err(crate::KhbbError::Database { + context: "create tracked_pools table", + message: error.to_string(), + }); + }, + } + Ok(()) +} + +/// Inserts a new listener session row and returns the created session object. +pub(crate) async fn insert_listener_session( + pool: &sqlx::SqlitePool, + config: &crate::KhbbAppConfig, +) -> core::result::Result { + let started_at = chrono::Utc::now().to_rfc3339(); + let status = std::string::String::from("running"); + let insert_result = sqlx::query( + r#" +INSERT INTO listener_sessions ( + started_at, + status, + solana_http_rpc_url, + solana_ws_rpc_url, + yellowstone_grpc_url +) VALUES (?1, ?2, ?3, ?4, ?5); +"#, + ) + .bind(&started_at) + .bind(&status) + .bind(&config.solana_http_rpc_url) + .bind(&config.solana_ws_rpc_url) + .bind(&config.yellowstone_grpc_url) + .execute(pool) + .await; + let query_result = match insert_result { + Ok(value) => value, + Err(error) => { + return Err(crate::KhbbError::Database { + context: "insert listener session", + message: error.to_string(), + }); + }, + }; + let session = crate::KhbbListenerSession { + id: query_result.last_insert_rowid(), + started_at, + status, + solana_http_rpc_url: config.solana_http_rpc_url.clone(), + solana_ws_rpc_url: config.solana_ws_rpc_url.clone(), + yellowstone_grpc_url: config.yellowstone_grpc_url.clone(), + }; + Ok(session) +} + +/// Updates the status of an existing listener session. +pub(crate) async fn update_listener_session_status( + pool: &sqlx::SqlitePool, + session_id: i64, + status: &str, +) -> core::result::Result<(), crate::KhbbError> { + let update_result = sqlx::query( + r#" +UPDATE listener_sessions +SET status = ?1 +WHERE id = ?2; +"#, + ) + .bind(status) + .bind(session_id) + .execute(pool) + .await; + match update_result { + Ok(_) => Ok(()), + Err(error) => Err(crate::KhbbError::Database { + context: "update listener session status", + message: error.to_string(), + }), + } +} diff --git a/khbb_lib/src/tracing_setup.rs b/khbb_lib/src/tracing_setup.rs index a9f31d0..0c513ed 100644 --- a/khbb_lib/src/tracing_setup.rs +++ b/khbb_lib/src/tracing_setup.rs @@ -1,5 +1,7 @@ // file: khbb_lib/src/tracing_setup.rs +//! Tracing subscriber initialization. + /// Initializes tracing subscribers for the application. pub fn init_tracing(log_filter: &str) -> core::result::Result<(), crate::KhbbError> { let env_filter_result = @@ -11,7 +13,7 @@ pub fn init_tracing(log_filter: &str) -> core::result::Result<(), crate::KhbbErr context: "build env filter", message: error.to_string(), }); - }, + } }; let subscriber = tracing_subscriber::fmt() .with_env_filter(env_filter) diff --git a/khbb_listener_app/src/main.rs b/khbb_listener_app/src/main.rs index 7337926..12a0f18 100644 --- a/khbb_listener_app/src/main.rs +++ b/khbb_listener_app/src/main.rs @@ -8,9 +8,6 @@ #![warn(missing_docs)] /// Entrypoint of the khbb listener binary. -/// -/// This binary is intentionally thin and delegates all business logic to -/// `khbb_lib`. #[tokio::main] async fn main() -> std::process::ExitCode { let args = std::env::args().collect::>();