// file: khbb_lib/src/storage.rs //! SQLite storage bootstrap and persistence helpers. /// Creates a SQLite pool for the khbb runtime. pub async fn create_sqlite_pool( database_url: &str, ) -> core::result::Result { let sqlite_path = if let Some(value) = database_url.strip_prefix("sqlite://") { value } else if let Some(value) = database_url.strip_prefix("sqlite:") { value } else { "" }; if !sqlite_path.is_empty() { let path = std::path::Path::new(sqlite_path); if let Some(parent) = path.parent() { if !parent.as_os_str().is_empty() { let create_dir_result = tokio::fs::create_dir_all(parent).await; match create_dir_result { Ok(()) => {} Err(error) => { return Err(crate::KhbbError::Io { context: "create sqlite parent directory", message: error.to_string(), }); } } } } } let parse_result = ::from_str( database_url, ); let connect_options = match parse_result { Ok(value) => value.create_if_missing(true), Err(error) => { return Err(crate::KhbbError::Config { message: std::format!( "invalid sqlite database url `{database_url}`: {}", error ), }); } }; let connect_result = sqlx::sqlite::SqlitePoolOptions::new() .max_connections(1) .connect_with(connect_options) .await; match connect_result { Ok(pool) => Ok(pool), Err(error) => Err(crate::KhbbError::Database { context: "connect sqlite pool", message: error.to_string(), }), } } /// Ensures that the initial SQLite schema exists. pub async fn ensure_sqlite_schema( pool: &sqlx::SqlitePool, ) -> core::result::Result<(), crate::KhbbError> { let pragma_foreign_keys_result = sqlx::query("PRAGMA foreign_keys = ON;").execute(pool).await; match pragma_foreign_keys_result { Ok(_) => {}, Err(error) => { return Err(crate::KhbbError::Database { context: "enable sqlite foreign keys", message: error.to_string(), }); }, } let listener_sessions_sql = r#" CREATE TABLE IF NOT EXISTS listener_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, started_at TEXT NOT NULL, status TEXT NOT NULL, solana_http_rpc_url TEXT NOT NULL, solana_ws_rpc_url TEXT NOT NULL, yellowstone_grpc_url TEXT NULL ); "#; let create_listener_sessions_result = sqlx::query(listener_sessions_sql).execute(pool).await; match create_listener_sessions_result { Ok(_) => {}, Err(error) => { return Err(crate::KhbbError::Database { context: "create listener_sessions table", message: error.to_string(), }); }, } let raw_http_rpc_messages_sql = r#" CREATE TABLE IF NOT EXISTS raw_http_rpc_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, listener_session_id INTEGER NOT NULL, method TEXT NOT NULL, request_body TEXT NOT NULL, response_body TEXT NOT NULL, created_at TEXT NOT NULL, FOREIGN KEY(listener_session_id) REFERENCES listener_sessions(id) ); "#; let create_raw_http_rpc_messages_result = sqlx::query(raw_http_rpc_messages_sql).execute(pool).await; match create_raw_http_rpc_messages_result { Ok(_) => {}, Err(error) => { return Err(crate::KhbbError::Database { context: "create raw_http_rpc_messages table", message: error.to_string(), }); }, } let raw_ws_messages_sql = r#" CREATE TABLE IF NOT EXISTS raw_ws_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, listener_session_id INTEGER NOT NULL, direction TEXT NOT NULL, message_text TEXT NOT NULL, created_at TEXT NOT NULL, FOREIGN KEY(listener_session_id) REFERENCES listener_sessions(id) ); "#; let create_raw_ws_messages_result = sqlx::query(raw_ws_messages_sql).execute(pool).await; match create_raw_ws_messages_result { Ok(_) => {}, Err(error) => { return Err(crate::KhbbError::Database { context: "create raw_ws_messages table", message: error.to_string(), }); }, } let raw_grpc_messages_sql = r#" CREATE TABLE IF NOT EXISTS raw_grpc_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, listener_session_id INTEGER NOT NULL, stream_name TEXT NOT NULL, message_json TEXT NOT NULL, created_at TEXT NOT NULL, FOREIGN KEY(listener_session_id) REFERENCES listener_sessions(id) ); "#; let create_raw_grpc_messages_result = sqlx::query(raw_grpc_messages_sql).execute(pool).await; match create_raw_grpc_messages_result { Ok(_) => {}, Err(error) => { return Err(crate::KhbbError::Database { context: "create raw_grpc_messages table", message: error.to_string(), }); }, } let tracked_tokens_sql = r#" CREATE TABLE IF NOT EXISTS tracked_tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, mint_address TEXT NOT NULL UNIQUE, symbol TEXT NULL, name TEXT NULL, created_at TEXT NOT NULL ); "#; let create_tracked_tokens_result = sqlx::query(tracked_tokens_sql).execute(pool).await; match create_tracked_tokens_result { Ok(_) => {}, Err(error) => { return Err(crate::KhbbError::Database { context: "create tracked_tokens table", message: error.to_string(), }); }, } let tracked_pools_sql = r#" CREATE TABLE IF NOT EXISTS tracked_pools ( id INTEGER PRIMARY KEY AUTOINCREMENT, dex_name TEXT NOT NULL, pool_address TEXT NOT NULL UNIQUE, token_a_mint TEXT NOT NULL, token_b_mint TEXT NOT NULL, created_at TEXT NOT NULL ); "#; let create_tracked_pools_result = sqlx::query(tracked_pools_sql).execute(pool).await; match create_tracked_pools_result { Ok(_) => {}, Err(error) => { return Err(crate::KhbbError::Database { context: "create tracked_pools table", message: error.to_string(), }); }, } Ok(()) } /// Inserts a new listener session row and returns the created session object. pub(crate) async fn insert_listener_session( pool: &sqlx::SqlitePool, config: &crate::KhbbAppConfig, ) -> core::result::Result { let started_at = chrono::Utc::now().to_rfc3339(); let status = std::string::String::from("running"); let insert_result = sqlx::query( r#" INSERT INTO listener_sessions ( started_at, status, solana_http_rpc_url, solana_ws_rpc_url, yellowstone_grpc_url ) VALUES (?1, ?2, ?3, ?4, ?5); "#, ) .bind(&started_at) .bind(&status) .bind(&config.solana_http_rpc_url) .bind(&config.solana_ws_rpc_url) .bind(&config.yellowstone_grpc_url) .execute(pool) .await; let query_result = match insert_result { Ok(value) => value, Err(error) => { return Err(crate::KhbbError::Database { context: "insert listener session", message: error.to_string(), }); }, }; let session = crate::KhbbListenerSession { id: query_result.last_insert_rowid(), started_at, status, solana_http_rpc_url: config.solana_http_rpc_url.clone(), solana_ws_rpc_url: config.solana_ws_rpc_url.clone(), yellowstone_grpc_url: config.yellowstone_grpc_url.clone(), }; Ok(session) } /// Updates the status of an existing listener session. pub(crate) async fn update_listener_session_status( pool: &sqlx::SqlitePool, session_id: i64, status: &str, ) -> core::result::Result<(), crate::KhbbError> { let update_result = sqlx::query( r#" UPDATE listener_sessions SET status = ?1 WHERE id = ?2; "#, ) .bind(status) .bind(session_id) .execute(pool) .await; match update_result { Ok(_) => Ok(()), Err(error) => Err(crate::KhbbError::Database { context: "update listener session status", message: error.to_string(), }), } }