Files
khadhroony-bobot/khbb_lib/src/storage.rs
2026-04-18 23:40:49 +02:00

541 lines
18 KiB
Rust

// file: khbb_lib/src/storage.rs
//! SQLite storage bootstrap and persistence helpers.
fn extract_sqlite_file_path(
database_url: &str,
) -> core::result::Result<std::option::Option<std::path::PathBuf>, crate::KhbbError> {
if database_url == "sqlite::memory:" {
return Ok(None);
}
if let Some(value) = database_url.strip_prefix("sqlite://") {
return Ok(Some(std::path::PathBuf::from(value)));
}
if let Some(value) = database_url.strip_prefix("sqlite:") {
return Ok(Some(std::path::PathBuf::from(value)));
}
Err(crate::KhbbError::Config {
message: std::format!("invalid sqlite database url `{database_url}`"),
})
}
/// Creates a SQLite pool for the khbb runtime.
pub async fn create_sqlite_pool(
database_url: &str,
) -> core::result::Result<sqlx::SqlitePool, crate::KhbbError> {
let sqlite_path_result = extract_sqlite_file_path(database_url);
let sqlite_path = match sqlite_path_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
if let Some(path) = sqlite_path {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
let create_dir_result = tokio::fs::create_dir_all(parent).await;
match create_dir_result {
Ok(()) => {},
Err(error) => {
return Err(crate::KhbbError::Io {
context: "create sqlite parent directory",
message: error.to_string(),
});
},
}
}
}
}
let parse_result =
<sqlx::sqlite::SqliteConnectOptions as std::str::FromStr>::from_str(database_url);
let connect_options = match parse_result {
Ok(value) => value.create_if_missing(true),
Err(error) => {
return Err(crate::KhbbError::Config {
message: std::format!("invalid sqlite database url `{database_url}`: {}", error),
});
},
};
let connect_result = sqlx::sqlite::SqlitePoolOptions::new()
.max_connections(1)
.connect_with(connect_options)
.await;
match connect_result {
Ok(pool) => Ok(pool),
Err(error) => Err(crate::KhbbError::Database {
context: "connect sqlite pool",
message: error.to_string(),
}),
}
}
/// Ensures that the initial SQLite schema exists.
pub async fn ensure_sqlite_schema(
pool: &sqlx::SqlitePool,
) -> core::result::Result<(), crate::KhbbError> {
let pragma_foreign_keys_result = sqlx::query("PRAGMA foreign_keys = ON;").execute(pool).await;
match pragma_foreign_keys_result {
Ok(_) => {},
Err(error) => {
return Err(crate::KhbbError::Database {
context: "enable sqlite foreign keys",
message: error.to_string(),
});
},
}
let listener_sessions_sql = r#"
CREATE TABLE IF NOT EXISTS listener_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at TEXT NOT NULL,
status TEXT NOT NULL,
solana_http_rpc_url TEXT NOT NULL,
solana_ws_rpc_url TEXT NOT NULL,
yellowstone_grpc_url TEXT NULL
);
"#;
let create_listener_sessions_result = sqlx::query(listener_sessions_sql).execute(pool).await;
match create_listener_sessions_result {
Ok(_) => {},
Err(error) => {
return Err(crate::KhbbError::Database {
context: "create listener_sessions table",
message: error.to_string(),
});
},
}
let raw_http_rpc_messages_sql = r#"
CREATE TABLE IF NOT EXISTS raw_http_rpc_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
listener_session_id INTEGER NOT NULL,
request_id INTEGER NOT NULL,
method TEXT NOT NULL,
request_body TEXT NOT NULL,
response_body TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY(listener_session_id) REFERENCES listener_sessions(id)
);
"#;
let create_raw_http_rpc_messages_result =
sqlx::query(raw_http_rpc_messages_sql).execute(pool).await;
match create_raw_http_rpc_messages_result {
Ok(_) => {},
Err(error) => {
return Err(crate::KhbbError::Database {
context: "create raw_http_rpc_messages table",
message: error.to_string(),
});
},
}
let raw_ws_messages_sql = r#"
CREATE TABLE IF NOT EXISTS raw_ws_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
listener_session_id INTEGER NOT NULL,
direction TEXT NOT NULL,
message_text TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY(listener_session_id) REFERENCES listener_sessions(id)
);
"#;
let create_raw_ws_messages_result = sqlx::query(raw_ws_messages_sql).execute(pool).await;
match create_raw_ws_messages_result {
Ok(_) => {},
Err(error) => {
return Err(crate::KhbbError::Database {
context: "create raw_ws_messages table",
message: error.to_string(),
});
},
}
let raw_grpc_messages_sql = r#"
CREATE TABLE IF NOT EXISTS raw_grpc_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
listener_session_id INTEGER NOT NULL,
stream_name TEXT NOT NULL,
message_json TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY(listener_session_id) REFERENCES listener_sessions(id)
);
"#;
let create_raw_grpc_messages_result = sqlx::query(raw_grpc_messages_sql).execute(pool).await;
match create_raw_grpc_messages_result {
Ok(_) => {},
Err(error) => {
return Err(crate::KhbbError::Database {
context: "create raw_grpc_messages table",
message: error.to_string(),
});
},
}
let tracked_tokens_sql = r#"
CREATE TABLE IF NOT EXISTS tracked_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mint_address TEXT NOT NULL UNIQUE,
symbol TEXT NULL,
name TEXT NULL,
created_at TEXT NOT NULL
);
"#;
let create_tracked_tokens_result = sqlx::query(tracked_tokens_sql).execute(pool).await;
match create_tracked_tokens_result {
Ok(_) => {},
Err(error) => {
return Err(crate::KhbbError::Database {
context: "create tracked_tokens table",
message: error.to_string(),
});
},
}
let tracked_pools_sql = r#"
CREATE TABLE IF NOT EXISTS tracked_pools (
id INTEGER PRIMARY KEY AUTOINCREMENT,
dex_name TEXT NOT NULL,
pool_address TEXT NOT NULL UNIQUE,
token_a_mint TEXT NOT NULL,
token_b_mint TEXT NOT NULL,
created_at TEXT NOT NULL
);
"#;
let create_tracked_pools_result = sqlx::query(tracked_pools_sql).execute(pool).await;
match create_tracked_pools_result {
Ok(_) => {},
Err(error) => {
return Err(crate::KhbbError::Database {
context: "create tracked_pools table",
message: error.to_string(),
});
},
}
Ok(())
}
/// Inserts a new listener session row and returns the created session object.
pub(crate) async fn insert_listener_session(
pool: &sqlx::SqlitePool,
config: &crate::KhbbAppConfig,
) -> core::result::Result<crate::KhbbListenerSession, crate::KhbbError> {
let started_at = chrono::Utc::now().to_rfc3339();
let status = std::string::String::from("running");
let insert_result = sqlx::query(
r#"
INSERT INTO listener_sessions (
started_at,
status,
solana_http_rpc_url,
solana_ws_rpc_url,
yellowstone_grpc_url
) VALUES (?1, ?2, ?3, ?4, ?5);
"#,
)
.bind(&started_at)
.bind(&status)
.bind(&config.solana_http_rpc_url)
.bind(&config.solana_ws_rpc_url)
.bind(&config.yellowstone_grpc_url)
.execute(pool)
.await;
let query_result = match insert_result {
Ok(value) => value,
Err(error) => {
return Err(crate::KhbbError::Database {
context: "insert listener session",
message: error.to_string(),
});
},
};
let session = crate::KhbbListenerSession {
id: query_result.last_insert_rowid(),
started_at,
status,
solana_http_rpc_url: config.solana_http_rpc_url.clone(),
solana_ws_rpc_url: config.solana_ws_rpc_url.clone(),
yellowstone_grpc_url: config.yellowstone_grpc_url.clone(),
};
Ok(session)
}
/// Updates the status of an existing listener session.
pub(crate) async fn update_listener_session_status(
pool: &sqlx::SqlitePool,
session_id: i64,
status: &str,
) -> core::result::Result<(), crate::KhbbError> {
let update_result = sqlx::query(
r#"
UPDATE listener_sessions
SET status = ?1
WHERE id = ?2;
"#,
)
.bind(status)
.bind(session_id)
.execute(pool)
.await;
match update_result {
Ok(_) => Ok(()),
Err(error) => Err(crate::KhbbError::Database {
context: "update listener session status",
message: error.to_string(),
}),
}
}
pub(crate) async fn insert_raw_http_rpc_message(
pool: &sqlx::SqlitePool,
session_id: i64,
request_id: i64,
method: &str,
request_body: &str,
response_body: &str,
status: &str,
) -> core::result::Result<(), crate::KhbbError> {
let now = chrono::Utc::now().to_rfc3339();
let query_result = sqlx::query(
r#"
INSERT INTO raw_http_rpc_messages (
listener_session_id,
request_id,
method,
request_body,
response_body,
status,
created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(session_id)
.bind(request_id)
.bind(method)
.bind(request_body)
.bind(response_body)
.bind(status)
.bind(now)
.execute(pool)
.await;
match query_result {
Ok(_) => Ok(()),
Err(error) => Err(crate::KhbbError::Database {
context: "insert raw http rpc message",
message: error.to_string(),
}),
}
}
pub(crate) async fn insert_raw_ws_message(
pool: &sqlx::SqlitePool,
session_id: i64,
direction: &str,
message_text: &str,
) -> core::result::Result<(), crate::KhbbError> {
let now = chrono::Utc::now().to_rfc3339();
let insert_result = sqlx::query(
r#"
INSERT INTO raw_ws_messages (
listener_session_id,
direction,
message_text,
created_at
) VALUES (?1, ?2, ?3, ?4);
"#,
)
.bind(session_id)
.bind(direction)
.bind(message_text)
.bind(now)
.execute(pool)
.await;
match insert_result {
Ok(_) => Ok(()),
Err(error) => Err(crate::KhbbError::Database {
context: "insert raw websocket message",
message: error.to_string(),
}),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn build_test_config(database_url: std::string::String) -> crate::KhbbAppConfig {
crate::KhbbAppConfig {
database_url,
solana_http_rpc_url: std::string::String::from(
"https://mainnet.helius-rpc.com/?api-key=test",
),
solana_ws_rpc_url: std::string::String::from(
"wss://mainnet.helius-rpc.com/?api-key=test",
),
yellowstone_grpc_url: Some(std::string::String::from(
"https://mainnet.helius-rpc.com:443",
)),
log_filter: std::string::String::from("info"),
bootstrap_database: true,
listener_max_ticks: 3,
listener_poll_interval_ms: 1000,
enable_ws_slot_subscribe: true,
enable_ws_logs_subscribe: true,
enable_ws_program_subscribe: false,
ws_program_subscribe_program_ids: vec![],
}
}
fn build_temp_sqlite_url() -> std::string::String {
let temp_dir =
std::env::temp_dir().join(std::format!("khbb_storage_test_{}", uuid::Uuid::new_v4()));
let db_path = temp_dir.join("app.db");
std::format!("sqlite://{}", db_path.to_string_lossy())
}
#[test]
fn extract_sqlite_file_path_accepts_memory_url() {
let result = super::extract_sqlite_file_path("sqlite::memory:");
assert!(result.is_ok());
let path = result.expect("extract memory path");
assert!(path.is_none());
}
#[test]
fn extract_sqlite_file_path_accepts_file_url() {
let result = super::extract_sqlite_file_path("sqlite://./dbdata/app.db");
assert!(result.is_ok());
let path = result.expect("extract file path");
assert!(path.is_some());
}
#[test]
fn extract_sqlite_file_path_rejects_invalid_scheme() {
let result = super::extract_sqlite_file_path("postgres://localhost/test");
assert!(result.is_err());
}
#[tokio::test]
async fn create_sqlite_pool_creates_file_database() {
let database_url = build_temp_sqlite_url();
let pool_result = crate::create_sqlite_pool(&database_url).await;
assert!(pool_result.is_ok());
let pool = pool_result.expect("create sqlite pool");
let ping_result = sqlx::query("SELECT 1;").execute(&pool).await;
assert!(ping_result.is_ok());
}
#[tokio::test]
async fn ensure_sqlite_schema_creates_tables() {
let database_url = build_temp_sqlite_url();
let pool_result = crate::create_sqlite_pool(&database_url).await;
assert!(pool_result.is_ok());
let pool = pool_result.expect("create sqlite pool");
let schema_result = crate::ensure_sqlite_schema(&pool).await;
assert!(schema_result.is_ok());
let query_result = sqlx::query(
r#"
SELECT name
FROM sqlite_master
WHERE type = 'table'
AND name = 'listener_sessions';
"#,
)
.fetch_one(&pool)
.await;
assert!(query_result.is_ok());
}
#[tokio::test]
async fn insert_listener_session_inserts_row() {
let database_url = build_temp_sqlite_url();
let pool_result = crate::create_sqlite_pool(&database_url).await;
assert!(pool_result.is_ok());
let pool = pool_result.expect("create sqlite pool");
let schema_result = crate::ensure_sqlite_schema(&pool).await;
assert!(schema_result.is_ok());
let config = build_test_config(database_url);
let insert_result = super::insert_listener_session(&pool, &config).await;
assert!(insert_result.is_ok());
let session = insert_result.expect("insert listener session");
assert!(session.id > 0);
assert_eq!(session.status, "running");
}
#[tokio::test]
async fn update_listener_session_status_updates_row() {
let database_url = build_temp_sqlite_url();
let pool_result = crate::create_sqlite_pool(&database_url).await;
assert!(pool_result.is_ok());
let pool = pool_result.expect("create sqlite pool");
let schema_result = crate::ensure_sqlite_schema(&pool).await;
assert!(schema_result.is_ok());
let config = build_test_config(database_url);
let insert_result = super::insert_listener_session(&pool, &config).await;
assert!(insert_result.is_ok());
let session = insert_result.expect("insert listener session");
let update_result =
super::update_listener_session_status(&pool, session.id, "stopped").await;
assert!(update_result.is_ok());
let fetch_result = sqlx::query_scalar::<_, std::string::String>(
r#"
SELECT status
FROM listener_sessions
WHERE id = ?1;
"#,
)
.bind(session.id)
.fetch_one(&pool)
.await;
assert!(fetch_result.is_ok());
let status = fetch_result.expect("fetch updated status");
assert_eq!(status, "stopped");
}
#[tokio::test]
async fn insert_raw_http_rpc_message_inserts_row() {
let pool = create_sqlite_pool("sqlite::memory:").await.expect("pool");
ensure_sqlite_schema(&pool).await.expect("schema");
let session = insert_listener_session(
&pool,
&crate::KhbbAppConfig {
database_url: "sqlite::memory:".into(),
solana_http_rpc_url: "http://localhost".into(),
solana_ws_rpc_url: "ws://localhost".into(),
yellowstone_grpc_url: None,
log_filter: "info".into(),
bootstrap_database: false,
listener_max_ticks: 3,
listener_poll_interval_ms: 1000,
enable_ws_slot_subscribe: true,
enable_ws_logs_subscribe: true,
enable_ws_program_subscribe: false,
ws_program_subscribe_program_ids: vec![],
},
)
.await
.expect("session");
let result =
insert_raw_http_rpc_message(&pool, session.id, 1, "getSlot", "{}", "{}", "ok").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn insert_raw_ws_message_inserts_row() {
let database_url = build_temp_sqlite_url();
let pool_result = crate::create_sqlite_pool(&database_url).await;
assert!(pool_result.is_ok());
let pool = pool_result.expect("create sqlite pool");
let schema_result = crate::ensure_sqlite_schema(&pool).await;
assert!(schema_result.is_ok());
let config = build_test_config(database_url);
let insert_session_result = super::insert_listener_session(&pool, &config).await;
assert!(insert_session_result.is_ok());
let session = insert_session_result.expect("insert listener session");
let insert_ws_result = super::insert_raw_ws_message(
&pool,
session.id,
"incoming",
r#"{"jsonrpc":"2.0","method":"slotNotification"}"#,
)
.await;
assert!(insert_ws_result.is_ok());
}
}