From fbd4d5d6efd1c8780bfd309a1684e4962e4f813b Mon Sep 17 00:00:00 2001 From: SinuS Von SifriduS Date: Sat, 25 Apr 2026 05:43:13 +0200 Subject: [PATCH] 0.5.6 --- CHANGELOG.md | 4 +- Cargo.toml | 2 +- ROADMAP.md | 12 +- kb_lib/src/db.rs | 8 + kb_lib/src/db/queries.rs | 12 + kb_lib/src/db/queries/dex.rs | 95 +++ kb_lib/src/db/queries/pair.rs | 198 +++++- kb_lib/src/db/queries/pool.rs | 154 +++++ kb_lib/src/db/queries/pool_listing.rs | 180 ++++-- kb_lib/src/db/queries/pool_token.rs | 129 +++- kb_lib/src/db/schema.rs | 876 +++++++++++++++++++++++--- kb_lib/src/lib.rs | 8 + 12 files changed, 1526 insertions(+), 152 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d089f00..6016836 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,4 +21,6 @@ 0.5.2 - Ajout de la table des tokens observés, de leur statut local et des premières requêtes de persistance associées 0.5.3 - Préparation du stockage local des événements techniques et des signaux utiles à l’analyse, avec distinction runtime / on-chain / métier 0.5.4 - Ajout du modèle métier normalisé initial pour les DEX, tokens, pools, paires, composition des pools et listings -0.5.5 - Activité métier normalisé +0.5.5 - Ajout des événements métier normalisés pour les swaps, liquidités, mints et burns de tokens +0.5.6 - Consolidation de la couche stockage : activation des foreign keys SQLite, lectures ciblées sur le modèle métier normalisé, index supplémentaires et tests unitaires dédiés + diff --git a/Cargo.toml b/Cargo.toml index d159dd1..43971cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.5.5" +version = "0.5.6" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index aa535f1..1715e08 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -365,14 +365,12 @@ Réalisé : - éviter que la détection technique `0.6.x` écrive directement dans des tables trop brutes ou ambiguës. ### 6.23. Version `0.5.5` — Activité métier normalisée -Objectif : poser les premiers événements métier exploitables. +Réalisé : -À faire : - -- ajouter les tables de swaps, -- ajouter les événements de liquidité, -- ajouter les événements de mint et burn utiles au suivi des tokens, -- préparer l’historique métier nécessaire avant l’arrivée des connecteurs DEX complets. +- ajout des tables de swaps, +- ajout des événements de liquidité, +- ajout des événements de mint et burn utiles au suivi des tokens, +- préparation de l’historique métier nécessaire avant l’arrivée des connecteurs DEX complets. ### 6.24. Version `0.5.6` — Consolidation de la couche stockage Objectif : stabiliser le schéma avant la détection technique réelle. diff --git a/kb_lib/src/db.rs b/kb_lib/src/db.rs index cb2a21f..f7861e3 100644 --- a/kb_lib/src/db.rs +++ b/kb_lib/src/db.rs @@ -83,6 +83,14 @@ pub use crate::db::queries::upsert_swap; pub use crate::db::queries::upsert_token; pub use crate::db::queries::upsert_token_burn_event; pub use crate::db::queries::upsert_token_mint_event; +pub use crate::db::queries::get_dex_by_code; +pub use crate::db::queries::get_pair_by_pool_id; +pub use crate::db::queries::get_pool_by_address; +pub use crate::db::queries::get_pool_listing_by_pool_id; +pub use crate::db::queries::list_pairs; +pub use crate::db::queries::list_pool_listings; +pub use crate::db::queries::list_pool_tokens_by_pool_id; +pub use crate::db::queries::list_pools; pub use crate::db::types::KbAnalysisSignalSeverity; pub use crate::db::types::KbDatabaseBackend; pub use crate::db::types::KbDbRuntimeEventLevel; diff --git a/kb_lib/src/db/queries.rs b/kb_lib/src/db/queries.rs index a8909dc..65d5c06 100644 --- a/kb_lib/src/db/queries.rs +++ b/kb_lib/src/db/queries.rs @@ -2,6 +2,10 @@ //! Database queries. +// file: kb_lib/src/db/queries.rs + +//! Database queries. + mod analysis_signal; mod db_metadata; mod db_runtime_event; @@ -27,6 +31,7 @@ pub use crate::db::queries::db_metadata::list_db_metadata; pub use crate::db::queries::db_metadata::upsert_db_metadata; pub use crate::db::queries::db_runtime_event::insert_db_runtime_event; pub use crate::db::queries::db_runtime_event::list_recent_db_runtime_events; +pub use crate::db::queries::dex::get_dex_by_code; pub use crate::db::queries::dex::list_dexes; pub use crate::db::queries::dex::upsert_dex; pub use crate::db::queries::known_http_endpoint::get_known_http_endpoint; @@ -42,9 +47,16 @@ pub use crate::db::queries::observed_token::list_observed_tokens; pub use crate::db::queries::observed_token::upsert_observed_token; pub use crate::db::queries::onchain_observation::insert_onchain_observation; pub use crate::db::queries::onchain_observation::list_recent_onchain_observations; +pub use crate::db::queries::pair::get_pair_by_pool_id; +pub use crate::db::queries::pair::list_pairs; pub use crate::db::queries::pair::upsert_pair; +pub use crate::db::queries::pool::get_pool_by_address; +pub use crate::db::queries::pool::list_pools; pub use crate::db::queries::pool::upsert_pool; +pub use crate::db::queries::pool_listing::get_pool_listing_by_pool_id; +pub use crate::db::queries::pool_listing::list_pool_listings; pub use crate::db::queries::pool_listing::upsert_pool_listing; +pub use crate::db::queries::pool_token::list_pool_tokens_by_pool_id; pub use crate::db::queries::pool_token::upsert_pool_token; pub use crate::db::queries::swap::list_recent_swaps; pub use crate::db::queries::swap::upsert_swap; diff --git a/kb_lib/src/db/queries/dex.rs b/kb_lib/src/db/queries/dex.rs index f73018d..8914e98 100644 --- a/kb_lib/src/db/queries/dex.rs +++ b/kb_lib/src/db/queries/dex.rs @@ -66,6 +66,55 @@ LIMIT 1 } } +/// Reads one normalized DEX row by code. +pub async fn get_dex_by_code( + database: &crate::KbDatabase, + code: &str, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + code, + name, + program_id, + router_program_id, + is_enabled, + created_at, + updated_at +FROM kb_dexes +WHERE code = ? +LIMIT 1 + "#, + ) + .bind(code) + .fetch_optional(pool) + .await; + let entity_option = match query_result { + Ok(entity_option) => entity_option, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot read kb_dexes '{}' on sqlite: {}", + code, error + ))); + } + }; + match entity_option { + Some(entity) => { + let dto_result = crate::KbDexDto::try_from(entity); + match dto_result { + Ok(dto) => Ok(Some(dto)), + Err(error) => Err(error), + } + } + None => Ok(None), + } + } + } +} + /// Lists normalized DEX rows. pub async fn list_dexes( database: &crate::KbDatabase, @@ -111,3 +160,49 @@ ORDER BY code ASC } } } + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn dex_roundtrip_works() { + let tempdir = tempfile::tempdir().expect("tempdir must succeed"); + let database_path = tempdir.path().join("dex_roundtrip.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database = crate::KbDatabase::connect_and_initialize(&config) + .await + .expect("database init must succeed"); + let dex_id = crate::upsert_dex( + &database, + &crate::KbDexDto::new( + "raydium".to_string(), + "Raydium".to_string(), + None, + None, + true, + ), + ) + .await + .expect("dex upsert must succeed"); + assert!(dex_id > 0); + let dex = crate::get_dex_by_code(&database, "raydium") + .await + .expect("get dex must succeed"); + assert!(dex.is_some()); + assert_eq!(dex.expect("dex must exist").name, "Raydium"); + let dexes = crate::list_dexes(&database) + .await + .expect("list dexes must succeed"); + assert_eq!(dexes.len(), 1); + } +} diff --git a/kb_lib/src/db/queries/pair.rs b/kb_lib/src/db/queries/pair.rs index 5931c46..b046ea0 100644 --- a/kb_lib/src/db/queries/pair.rs +++ b/kb_lib/src/db/queries/pair.rs @@ -59,10 +59,202 @@ LIMIT 1 Ok(id) => Ok(id), Err(error) => Err(crate::KbError::Db(format!( "cannot fetch kb_pairs id for pool_id '{}' on sqlite: {}", - dto.pool_id, - error + dto.pool_id, error ))), } - }, + } + } +} + +/// Reads one normalized pair row by pool id. +pub async fn get_pair_by_pool_id( + database: &crate::KbDatabase, + pool_id: i64, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + dex_id, + pool_id, + base_token_id, + quote_token_id, + symbol, + first_seen_at, + updated_at +FROM kb_pairs +WHERE pool_id = ? +LIMIT 1 + "#, + ) + .bind(pool_id) + .fetch_optional(pool) + .await; + let entity_option = match query_result { + Ok(entity_option) => entity_option, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot read kb_pairs by pool_id '{}' on sqlite: {}", + pool_id, error + ))); + } + }; + match entity_option { + Some(entity) => { + let dto_result = crate::KbPairDto::try_from(entity); + match dto_result { + Ok(dto) => Ok(Some(dto)), + Err(error) => Err(error), + } + } + None => Ok(None), + } + } + } +} + +/// Lists normalized pairs ordered by id ascending. +pub async fn list_pairs( + database: &crate::KbDatabase, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + dex_id, + pool_id, + base_token_id, + quote_token_id, + symbol, + first_seen_at, + updated_at +FROM kb_pairs +ORDER BY id ASC + "#, + ) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_pairs on sqlite: {}", + error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbPairDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn pair_roundtrip_works() { + let tempdir = tempfile::tempdir().expect("tempdir must succeed"); + let database_path = tempdir.path().join("pair_roundtrip.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database = crate::KbDatabase::connect_and_initialize(&config) + .await + .expect("database init must succeed"); + let dex_id = crate::upsert_dex( + &database, + &crate::KbDexDto::new( + "raydium".to_string(), + "Raydium".to_string(), + None, + None, + true, + ), + ) + .await + .expect("dex upsert must succeed"); + let base_token_id = crate::upsert_token( + &database, + &crate::KbTokenDto::new( + "Base111111111111111111111111111111111111111".to_string(), + Some("BASE".to_string()), + Some("Base Token".to_string()), + Some(6), + crate::SPL_TOKEN_PROGRAM_ID.to_string(), + false, + ), + ) + .await + .expect("base token upsert must succeed"); + let quote_token_id = crate::upsert_token( + &database, + &crate::KbTokenDto::new( + "So11111111111111111111111111111111111111112".to_string(), + Some("WSOL".to_string()), + Some("Wrapped SOL".to_string()), + Some(9), + crate::SPL_TOKEN_PROGRAM_ID.to_string(), + true, + ), + ) + .await + .expect("quote token upsert must succeed"); + let pool_id = crate::upsert_pool( + &database, + &crate::KbPoolDto::new( + dex_id, + "Pool111111111111111111111111111111111111111".to_string(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Active, + ), + ) + .await + .expect("pool upsert must succeed"); + let pair_id = crate::upsert_pair( + &database, + &crate::KbPairDto::new( + dex_id, + pool_id, + base_token_id, + quote_token_id, + Some("BASE/WSOL".to_string()), + ), + ) + .await + .expect("pair upsert must succeed"); + assert!(pair_id > 0); + let pair = crate::get_pair_by_pool_id(&database, pool_id) + .await + .expect("get pair must succeed"); + assert!(pair.is_some()); + assert_eq!( + pair.expect("pair must exist").symbol.as_deref(), + Some("BASE/WSOL") + ); + let pairs = crate::list_pairs(&database) + .await + .expect("list pairs must succeed"); + assert_eq!(pairs.len(), 1); } } diff --git a/kb_lib/src/db/queries/pool.rs b/kb_lib/src/db/queries/pool.rs index b06393b..076e6b2 100644 --- a/kb_lib/src/db/queries/pool.rs +++ b/kb_lib/src/db/queries/pool.rs @@ -62,3 +62,157 @@ LIMIT 1 } } } + +/// Reads one normalized pool row by address. +pub async fn get_pool_by_address( + database: &crate::KbDatabase, + address: &str, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + dex_id, + address, + pool_kind, + status, + first_seen_at, + updated_at +FROM kb_pools +WHERE address = ? +LIMIT 1 + "#, + ) + .bind(address) + .fetch_optional(pool) + .await; + let entity_option = match query_result { + Ok(entity_option) => entity_option, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot read kb_pools '{}' on sqlite: {}", + address, error + ))); + } + }; + match entity_option { + Some(entity) => { + let dto_result = crate::KbPoolDto::try_from(entity); + match dto_result { + Ok(dto) => Ok(Some(dto)), + Err(error) => Err(error), + } + } + None => Ok(None), + } + } + } +} + +/// Lists normalized pools ordered by id ascending. +pub async fn list_pools( + database: &crate::KbDatabase, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + dex_id, + address, + pool_kind, + status, + first_seen_at, + updated_at +FROM kb_pools +ORDER BY id ASC + "#, + ) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_pools on sqlite: {}", + error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbPoolDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn pool_roundtrip_works() { + let tempdir = tempfile::tempdir().expect("tempdir must succeed"); + let database_path = tempdir.path().join("pool_roundtrip.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database = crate::KbDatabase::connect_and_initialize(&config) + .await + .expect("database init must succeed"); + let dex_id = crate::upsert_dex( + &database, + &crate::KbDexDto::new( + "raydium".to_string(), + "Raydium".to_string(), + None, + None, + true, + ), + ) + .await + .expect("dex upsert must succeed"); + let pool_id = crate::upsert_pool( + &database, + &crate::KbPoolDto::new( + dex_id, + "Pool111111111111111111111111111111111111111".to_string(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Active, + ), + ) + .await + .expect("pool upsert must succeed"); + assert!(pool_id > 0); + let pool = + crate::get_pool_by_address(&database, "Pool111111111111111111111111111111111111111") + .await + .expect("get pool must succeed"); + assert!(pool.is_some()); + assert_eq!( + pool.expect("pool must exist").pool_kind, + crate::KbPoolKind::Amm + ); + let pools = crate::list_pools(&database) + .await + .expect("list pools must succeed"); + assert_eq!(pools.len(), 1); + } +} diff --git a/kb_lib/src/db/queries/pool_listing.rs b/kb_lib/src/db/queries/pool_listing.rs index ff49fc0..eef7717 100644 --- a/kb_lib/src/db/queries/pool_listing.rs +++ b/kb_lib/src/db/queries/pool_listing.rs @@ -76,12 +76,113 @@ LIMIT 1 } } +/// Reads one normalized pool listing row by pool id. +pub async fn get_pool_listing_by_pool_id( + database: &crate::KbDatabase, + pool_id: i64, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + dex_id, + pool_id, + pair_id, + source_kind, + source_endpoint_name, + detected_at, + initial_base_reserve, + initial_quote_reserve, + initial_price_quote, + updated_at +FROM kb_pool_listings +WHERE pool_id = ? +LIMIT 1 + "#, + ) + .bind(pool_id) + .fetch_optional(pool) + .await; + let entity_option = match query_result { + Ok(entity_option) => entity_option, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot read kb_pool_listings by pool_id '{}' on sqlite: {}", + pool_id, error + ))); + } + }; + match entity_option { + Some(entity) => { + let dto_result = crate::KbPoolListingDto::try_from(entity); + match dto_result { + Ok(dto) => Ok(Some(dto)), + Err(error) => Err(error), + } + } + None => Ok(None), + } + } + } +} + +/// Lists normalized pool listings ordered by detected date then id. +pub async fn list_pool_listings( + database: &crate::KbDatabase, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + dex_id, + pool_id, + pair_id, + source_kind, + source_endpoint_name, + detected_at, + initial_base_reserve, + initial_quote_reserve, + initial_price_quote, + updated_at +FROM kb_pool_listings +ORDER BY detected_at ASC, id ASC + "#, + ) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_pool_listings on sqlite: {}", + error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbPoolListingDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} + #[cfg(test)] mod tests { #[tokio::test] - async fn normalized_model_roundtrip_works() { + async fn pool_listing_roundtrip_works() { let tempdir = tempfile::tempdir().expect("tempdir must succeed"); - let database_path = tempdir.path().join("normalized_model.sqlite3"); + let database_path = tempdir.path().join("pool_listing_roundtrip.sqlite3"); let config = crate::KbDatabaseConfig { enabled: true, backend: crate::KbDatabaseBackend::Sqlite, @@ -112,12 +213,12 @@ mod tests { let base_token_id = crate::upsert_token( &database, &crate::KbTokenDto::new( - "So11111111111111111111111111111111111111112".to_string(), - Some("WSOL".to_string()), - Some("Wrapped SOL".to_string()), - Some(9), + "Base111111111111111111111111111111111111111".to_string(), + Some("BASE".to_string()), + Some("Base Token".to_string()), + Some(6), crate::SPL_TOKEN_PROGRAM_ID.to_string(), - true, + false, ), ) .await @@ -125,12 +226,12 @@ mod tests { let quote_token_id = crate::upsert_token( &database, &crate::KbTokenDto::new( - "DezX111111111111111111111111111111111111111".to_string(), - Some("TEST".to_string()), - Some("Test Token".to_string()), - Some(6), + "So11111111111111111111111111111111111111112".to_string(), + Some("WSOL".to_string()), + Some("Wrapped SOL".to_string()), + Some(9), crate::SPL_TOKEN_PROGRAM_ID.to_string(), - false, + true, ), ) .await @@ -151,38 +252,14 @@ mod tests { &crate::KbPairDto::new( dex_id, pool_id, - quote_token_id, base_token_id, - Some("TEST/WSOL".to_string()), + quote_token_id, + Some("BASE/WSOL".to_string()), ), ) .await .expect("pair upsert must succeed"); - let _pool_token_a_id = crate::upsert_pool_token( - &database, - &crate::KbPoolTokenDto::new( - pool_id, - quote_token_id, - crate::KbPoolTokenRole::Base, - None, - Some(0), - ), - ) - .await - .expect("pool token a upsert must succeed"); - let _pool_token_b_id = crate::upsert_pool_token( - &database, - &crate::KbPoolTokenDto::new( - pool_id, - base_token_id, - crate::KbPoolTokenRole::Quote, - None, - Some(1), - ), - ) - .await - .expect("pool token b upsert must succeed"); - let listing_id = crate::upsert_pool_listing( + let pool_listing_id = crate::upsert_pool_listing( &database, &crate::KbPoolListingDto::new( dex_id, @@ -191,21 +268,24 @@ mod tests { crate::KbObservationSourceKind::WsRpc, Some("mainnet_public_ws_slots".to_string()), Some(1000.0), - Some(42.0), - Some(0.042), + Some(10.0), + Some(0.01), ), ) .await .expect("pool listing upsert must succeed"); - assert!(listing_id > 0); - let dexes = crate::list_dexes(&database) + assert!(pool_listing_id > 0); + let pool_listing = crate::get_pool_listing_by_pool_id(&database, pool_id) .await - .expect("dex list must succeed"); - assert_eq!(dexes.len(), 1); - let token = - crate::get_token_by_mint(&database, "So11111111111111111111111111111111111111112") - .await - .expect("token get must succeed"); - assert!(token.is_some()); + .expect("get pool listing must succeed"); + assert!(pool_listing.is_some()); + assert_eq!( + pool_listing.expect("pool listing must exist").source_kind, + crate::KbObservationSourceKind::WsRpc + ); + let pool_listings = crate::list_pool_listings(&database) + .await + .expect("list pool listings must succeed"); + assert_eq!(pool_listings.len(), 1); } } diff --git a/kb_lib/src/db/queries/pool_token.rs b/kb_lib/src/db/queries/pool_token.rs index fe7df84..c15fe56 100644 --- a/kb_lib/src/db/queries/pool_token.rs +++ b/kb_lib/src/db/queries/pool_token.rs @@ -62,6 +62,133 @@ LIMIT 1 error ))), } - }, + } + } +} + +/// Lists normalized pool token rows by pool id. +pub async fn list_pool_tokens_by_pool_id( + database: &crate::KbDatabase, + pool_id: i64, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + pool_id, + token_id, + role, + vault_address, + token_order, + created_at, + updated_at +FROM kb_pool_tokens +WHERE pool_id = ? +ORDER BY token_order ASC, id ASC + "#, + ) + .bind(pool_id) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_pool_tokens for pool_id '{}' on sqlite: {}", + pool_id, error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbPoolTokenDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn pool_token_roundtrip_works() { + let tempdir = tempfile::tempdir().expect("tempdir must succeed"); + let database_path = tempdir.path().join("pool_token_roundtrip.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database = crate::KbDatabase::connect_and_initialize(&config) + .await + .expect("database init must succeed"); + let dex_id = crate::upsert_dex( + &database, + &crate::KbDexDto::new( + "raydium".to_string(), + "Raydium".to_string(), + None, + None, + true, + ), + ) + .await + .expect("dex upsert must succeed"); + let token_id = crate::upsert_token( + &database, + &crate::KbTokenDto::new( + "Base111111111111111111111111111111111111111".to_string(), + Some("BASE".to_string()), + Some("Base Token".to_string()), + Some(6), + crate::SPL_TOKEN_PROGRAM_ID.to_string(), + false, + ), + ) + .await + .expect("token upsert must succeed"); + let pool_id = crate::upsert_pool( + &database, + &crate::KbPoolDto::new( + dex_id, + "Pool111111111111111111111111111111111111111".to_string(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Active, + ), + ) + .await + .expect("pool upsert must succeed"); + let pool_token_id = crate::upsert_pool_token( + &database, + &crate::KbPoolTokenDto::new( + pool_id, + token_id, + crate::KbPoolTokenRole::Base, + Some("Vault111111111111111111111111111111111111".to_string()), + Some(0), + ), + ) + .await + .expect("pool token upsert must succeed"); + assert!(pool_token_id > 0); + let pool_tokens = crate::list_pool_tokens_by_pool_id(&database, pool_id) + .await + .expect("list pool tokens must succeed"); + assert_eq!(pool_tokens.len(), 1); + assert_eq!(pool_tokens[0].role, crate::KbPoolTokenRole::Base); } } diff --git a/kb_lib/src/db/schema.rs b/kb_lib/src/db/schema.rs index e640b3f..2068f29 100644 --- a/kb_lib/src/db/schema.rs +++ b/kb_lib/src/db/schema.rs @@ -6,15 +6,239 @@ pub(crate) async fn ensure_schema(database: &crate::KbDatabase) -> Result<(), crate::KbError> { match database.connection() { crate::KbDatabaseConnection::Sqlite(pool) => { - let statements = vec![ - r#" + let result = create_kb_db_metadata_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_known_http_endpoints_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_known_ws_endpoints_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_db_runtime_events_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_db_runtime_events_created_at(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_observed_tokens_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_observed_tokens_mint(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_observed_tokens_status(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_onchain_observations_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_onchain_observations_object_key(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_onchain_observations_observed_at(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_analysis_signals_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_analysis_signals_object_key(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_analysis_signals_created_at(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_dexes_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_tokens_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_tokens_token_program(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_tokens_is_quote_token(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_pools_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_pools_dex_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_pairs_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_pairs_dex_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_pairs_base_token_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_pairs_quote_token_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_pool_tokens_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_pool_tokens_pool_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_pool_tokens_token_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_pool_listings_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_pool_listings_detected_at(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_pool_listings_dex_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_swaps_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_swaps_pool_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_swaps_executed_at(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_swaps_pair_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_swaps_slot(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_liquidity_events_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_liquidity_events_pool_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_liquidity_events_executed_at(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_liquidity_events_pair_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_liquidity_events_slot(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_token_mint_events_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_token_mint_events_token_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_token_mint_events_executed_at(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_token_burn_events_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_token_burn_events_token_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_token_burn_events_executed_at(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = update_schema_version_metadata(database).await; + if let Err(error) = result { + return Err(error); + } + Ok(()) + } + } +} + +/// Executes one SQLite schema statement. +async fn execute_sqlite_schema_statement( + pool: &sqlx::SqlitePool, + statement_name: &str, + statement_sql: &str, +) -> Result<(), crate::KbError> { + let execute_result = sqlx::query(statement_sql).execute(pool).await; + match execute_result { + Ok(_) => Ok(()), + Err(error) => Err(crate::KbError::Db(format!( + "cannot initialize sqlite schema statement '{}': {}", + statement_name, error + ))), + } +} + +/// Creates `kb_db_metadata`. +async fn create_kb_db_metadata_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_db_metadata_table", + r#" CREATE TABLE IF NOT EXISTS kb_db_metadata ( key TEXT NOT NULL PRIMARY KEY, value TEXT NOT NULL, updated_at TEXT NOT NULL ) - "#, - r#" + "#, + ) + .await +} + +/// Creates `kb_known_http_endpoints`. +async fn create_kb_known_http_endpoints_table( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_known_http_endpoints_table", + r#" CREATE TABLE IF NOT EXISTS kb_known_http_endpoints ( name TEXT NOT NULL PRIMARY KEY, provider TEXT NOT NULL, @@ -24,8 +248,17 @@ CREATE TABLE IF NOT EXISTS kb_known_http_endpoints ( last_seen_at TEXT NULL, updated_at TEXT NOT NULL ) - "#, - r#" + "#, + ) + .await +} + +/// Creates `kb_known_ws_endpoints`. +async fn create_kb_known_ws_endpoints_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_known_ws_endpoints_table", + r#" CREATE TABLE IF NOT EXISTS kb_known_ws_endpoints ( name TEXT NOT NULL PRIMARY KEY, provider TEXT NOT NULL, @@ -35,8 +268,17 @@ CREATE TABLE IF NOT EXISTS kb_known_ws_endpoints ( last_seen_at TEXT NULL, updated_at TEXT NOT NULL ) - "#, - r#" + "#, + ) + .await +} + +/// Creates `kb_db_runtime_events`. +async fn create_kb_db_runtime_events_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_db_runtime_events_table", + r#" CREATE TABLE IF NOT EXISTS kb_db_runtime_events ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, event_kind TEXT NOT NULL, @@ -45,12 +287,32 @@ CREATE TABLE IF NOT EXISTS kb_db_runtime_events ( message TEXT NOT NULL, created_at TEXT NOT NULL ) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_db_runtime_events(created_at)`. +async fn create_kb_idx_db_runtime_events_created_at( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_db_runtime_events_created_at", + r#" CREATE INDEX IF NOT EXISTS kb_idx_db_runtime_events_created_at ON kb_db_runtime_events (created_at) - "#, - r#" + "#, + ) + .await +} + +/// Creates `kb_observed_tokens`. +async fn create_kb_observed_tokens_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_observed_tokens_table", + r#" CREATE TABLE IF NOT EXISTS kb_observed_tokens ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, mint TEXT NOT NULL UNIQUE, @@ -63,16 +325,47 @@ CREATE TABLE IF NOT EXISTS kb_observed_tokens ( last_seen_at TEXT NOT NULL, updated_at TEXT NOT NULL ) - "#, - r#" + "#, + ) + .await +} + +/// Creates unique index on `kb_observed_tokens(mint)`. +async fn create_kb_idx_observed_tokens_mint(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_observed_tokens_mint", + r#" CREATE UNIQUE INDEX IF NOT EXISTS kb_idx_observed_tokens_mint ON kb_observed_tokens (mint) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_observed_tokens(status)`. +async fn create_kb_idx_observed_tokens_status( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_observed_tokens_status", + r#" CREATE INDEX IF NOT EXISTS kb_idx_observed_tokens_status ON kb_observed_tokens (status) - "#, - r#" + "#, + ) + .await +} + +/// Creates `kb_onchain_observations`. +async fn create_kb_onchain_observations_table( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_onchain_observations_table", + r#" CREATE TABLE IF NOT EXISTS kb_onchain_observations ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, observation_kind TEXT NOT NULL, @@ -83,16 +376,47 @@ CREATE TABLE IF NOT EXISTS kb_onchain_observations ( payload_json TEXT NOT NULL, observed_at TEXT NOT NULL ) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_onchain_observations(object_key)`. +async fn create_kb_idx_onchain_observations_object_key( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_onchain_observations_object_key", + r#" CREATE INDEX IF NOT EXISTS kb_idx_onchain_observations_object_key ON kb_onchain_observations (object_key) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_onchain_observations(observed_at)`. +async fn create_kb_idx_onchain_observations_observed_at( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_onchain_observations_observed_at", + r#" CREATE INDEX IF NOT EXISTS kb_idx_onchain_observations_observed_at ON kb_onchain_observations (observed_at) - "#, - r#" + "#, + ) + .await +} + +/// Creates `kb_analysis_signals`. +async fn create_kb_analysis_signals_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_analysis_signals_table", + r#" CREATE TABLE IF NOT EXISTS kb_analysis_signals ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, signal_kind TEXT NOT NULL, @@ -104,16 +428,47 @@ CREATE TABLE IF NOT EXISTS kb_analysis_signals ( created_at TEXT NOT NULL, FOREIGN KEY(related_observation_id) REFERENCES kb_onchain_observations(id) ) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_analysis_signals(object_key)`. +async fn create_kb_idx_analysis_signals_object_key( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_analysis_signals_object_key", + r#" CREATE INDEX IF NOT EXISTS kb_idx_analysis_signals_object_key ON kb_analysis_signals (object_key) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_analysis_signals(created_at)`. +async fn create_kb_idx_analysis_signals_created_at( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_analysis_signals_created_at", + r#" CREATE INDEX IF NOT EXISTS kb_idx_analysis_signals_created_at ON kb_analysis_signals (created_at) - "#, - r#" + "#, + ) + .await +} + +/// Creates `kb_dexes`. +async fn create_kb_dexes_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_dexes_table", + r#" CREATE TABLE IF NOT EXISTS kb_dexes ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, code TEXT NOT NULL UNIQUE, @@ -124,8 +479,17 @@ CREATE TABLE IF NOT EXISTS kb_dexes ( created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) - "#, - r#" + "#, + ) + .await +} + +/// Creates `kb_tokens`. +async fn create_kb_tokens_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_tokens_table", + r#" CREATE TABLE IF NOT EXISTS kb_tokens ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, mint TEXT NOT NULL UNIQUE, @@ -137,8 +501,45 @@ CREATE TABLE IF NOT EXISTS kb_tokens ( first_seen_at TEXT NOT NULL, updated_at TEXT NOT NULL ) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_tokens(token_program)`. +async fn create_kb_idx_tokens_token_program(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_tokens_token_program", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_tokens_token_program +ON kb_tokens (token_program) + "#, + ) + .await +} + +/// Creates index on `kb_tokens(is_quote_token)`. +async fn create_kb_idx_tokens_is_quote_token( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_tokens_is_quote_token", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_tokens_is_quote_token +ON kb_tokens (is_quote_token) + "#, + ) + .await +} + +/// Creates `kb_pools`. +async fn create_kb_pools_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_pools_table", + r#" CREATE TABLE IF NOT EXISTS kb_pools ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, dex_id INTEGER NOT NULL, @@ -149,12 +550,30 @@ CREATE TABLE IF NOT EXISTS kb_pools ( updated_at TEXT NOT NULL, FOREIGN KEY(dex_id) REFERENCES kb_dexes(id) ) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_pools(dex_id)`. +async fn create_kb_idx_pools_dex_id(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_pools_dex_id", + r#" CREATE INDEX IF NOT EXISTS kb_idx_pools_dex_id ON kb_pools (dex_id) - "#, - r#" + "#, + ) + .await +} + +/// Creates `kb_pairs`. +async fn create_kb_pairs_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_pairs_table", + r#" CREATE TABLE IF NOT EXISTS kb_pairs ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, dex_id INTEGER NOT NULL, @@ -169,8 +588,56 @@ CREATE TABLE IF NOT EXISTS kb_pairs ( FOREIGN KEY(base_token_id) REFERENCES kb_tokens(id), FOREIGN KEY(quote_token_id) REFERENCES kb_tokens(id) ) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_pairs(dex_id)`. +async fn create_kb_idx_pairs_dex_id(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_pairs_dex_id", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_pairs_dex_id +ON kb_pairs (dex_id) + "#, + ) + .await +} + +/// Creates index on `kb_pairs(base_token_id)`. +async fn create_kb_idx_pairs_base_token_id(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_pairs_base_token_id", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_pairs_base_token_id +ON kb_pairs (base_token_id) + "#, + ) + .await +} + +/// Creates index on `kb_pairs(quote_token_id)`. +async fn create_kb_idx_pairs_quote_token_id(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_pairs_quote_token_id", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_pairs_quote_token_id +ON kb_pairs (quote_token_id) + "#, + ) + .await +} + +/// Creates `kb_pool_tokens`. +async fn create_kb_pool_tokens_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_pool_tokens_table", + r#" CREATE TABLE IF NOT EXISTS kb_pool_tokens ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, pool_id INTEGER NOT NULL, @@ -184,8 +651,43 @@ CREATE TABLE IF NOT EXISTS kb_pool_tokens ( FOREIGN KEY(token_id) REFERENCES kb_tokens(id), UNIQUE(pool_id, token_id, role) ) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_pool_tokens(pool_id)`. +async fn create_kb_idx_pool_tokens_pool_id(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_pool_tokens_pool_id", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_pool_tokens_pool_id +ON kb_pool_tokens (pool_id) + "#, + ) + .await +} + +/// Creates index on `kb_pool_tokens(token_id)`. +async fn create_kb_idx_pool_tokens_token_id(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_pool_tokens_token_id", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_pool_tokens_token_id +ON kb_pool_tokens (token_id) + "#, + ) + .await +} + +/// Creates `kb_pool_listings`. +async fn create_kb_pool_listings_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_pool_listings_table", + r#" CREATE TABLE IF NOT EXISTS kb_pool_listings ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, dex_id INTEGER NOT NULL, @@ -202,12 +704,45 @@ CREATE TABLE IF NOT EXISTS kb_pool_listings ( FOREIGN KEY(pool_id) REFERENCES kb_pools(id), FOREIGN KEY(pair_id) REFERENCES kb_pairs(id) ) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_pool_listings(detected_at)`. +async fn create_kb_idx_pool_listings_detected_at( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_pool_listings_detected_at", + r#" CREATE INDEX IF NOT EXISTS kb_idx_pool_listings_detected_at ON kb_pool_listings (detected_at) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_pool_listings(dex_id)`. +async fn create_kb_idx_pool_listings_dex_id(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_pool_listings_dex_id", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_pool_listings_dex_id +ON kb_pool_listings (dex_id) + "#, + ) + .await +} + +/// Creates `kb_swaps`. +async fn create_kb_swaps_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_swaps_table", + r#" CREATE TABLE IF NOT EXISTS kb_swaps ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, dex_id INTEGER NOT NULL, @@ -231,16 +766,69 @@ CREATE TABLE IF NOT EXISTS kb_swaps ( FOREIGN KEY(quote_token_id) REFERENCES kb_tokens(id), UNIQUE(signature, instruction_index) ) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_swaps(pool_id)`. +async fn create_kb_idx_swaps_pool_id(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_swaps_pool_id", + r#" CREATE INDEX IF NOT EXISTS kb_idx_swaps_pool_id ON kb_swaps (pool_id) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_swaps(executed_at)`. +async fn create_kb_idx_swaps_executed_at(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_swaps_executed_at", + r#" CREATE INDEX IF NOT EXISTS kb_idx_swaps_executed_at ON kb_swaps (executed_at) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_swaps(pair_id)`. +async fn create_kb_idx_swaps_pair_id(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_swaps_pair_id", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_swaps_pair_id +ON kb_swaps (pair_id) + "#, + ) + .await +} + +/// Creates index on `kb_swaps(slot)`. +async fn create_kb_idx_swaps_slot(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_swaps_slot", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_swaps_slot +ON kb_swaps (slot) + "#, + ) + .await +} + +/// Creates `kb_liquidity_events`. +async fn create_kb_liquidity_events_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_liquidity_events_table", + r#" CREATE TABLE IF NOT EXISTS kb_liquidity_events ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, dex_id INTEGER NOT NULL, @@ -266,16 +854,77 @@ CREATE TABLE IF NOT EXISTS kb_liquidity_events ( FOREIGN KEY(lp_token_id) REFERENCES kb_tokens(id), UNIQUE(signature, instruction_index) ) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_liquidity_events(pool_id)`. +async fn create_kb_idx_liquidity_events_pool_id( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_liquidity_events_pool_id", + r#" CREATE INDEX IF NOT EXISTS kb_idx_liquidity_events_pool_id ON kb_liquidity_events (pool_id) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_liquidity_events(executed_at)`. +async fn create_kb_idx_liquidity_events_executed_at( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_liquidity_events_executed_at", + r#" CREATE INDEX IF NOT EXISTS kb_idx_liquidity_events_executed_at ON kb_liquidity_events (executed_at) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_liquidity_events(pair_id)`. +async fn create_kb_idx_liquidity_events_pair_id( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_liquidity_events_pair_id", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_liquidity_events_pair_id +ON kb_liquidity_events (pair_id) + "#, + ) + .await +} + +/// Creates index on `kb_liquidity_events(slot)`. +async fn create_kb_idx_liquidity_events_slot( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_liquidity_events_slot", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_liquidity_events_slot +ON kb_liquidity_events (slot) + "#, + ) + .await +} + +/// Creates `kb_token_mint_events`. +async fn create_kb_token_mint_events_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_token_mint_events_table", + r#" CREATE TABLE IF NOT EXISTS kb_token_mint_events ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, token_id INTEGER NOT NULL, @@ -290,16 +939,47 @@ CREATE TABLE IF NOT EXISTS kb_token_mint_events ( FOREIGN KEY(token_id) REFERENCES kb_tokens(id), UNIQUE(signature, instruction_index) ) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_token_mint_events(token_id)`. +async fn create_kb_idx_token_mint_events_token_id( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_token_mint_events_token_id", + r#" CREATE INDEX IF NOT EXISTS kb_idx_token_mint_events_token_id ON kb_token_mint_events (token_id) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_token_mint_events(executed_at)`. +async fn create_kb_idx_token_mint_events_executed_at( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_token_mint_events_executed_at", + r#" CREATE INDEX IF NOT EXISTS kb_idx_token_mint_events_executed_at ON kb_token_mint_events (executed_at) - "#, - r#" + "#, + ) + .await +} + +/// Creates `kb_token_burn_events`. +async fn create_kb_token_burn_events_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_token_burn_events_table", + r#" CREATE TABLE IF NOT EXISTS kb_token_burn_events ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, token_id INTEGER NOT NULL, @@ -314,34 +994,52 @@ CREATE TABLE IF NOT EXISTS kb_token_burn_events ( FOREIGN KEY(token_id) REFERENCES kb_tokens(id), UNIQUE(signature, instruction_index) ) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_token_burn_events(token_id)`. +async fn create_kb_idx_token_burn_events_token_id( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_token_burn_events_token_id", + r#" CREATE INDEX IF NOT EXISTS kb_idx_token_burn_events_token_id ON kb_token_burn_events (token_id) - "#, - r#" + "#, + ) + .await +} + +/// Creates index on `kb_token_burn_events(executed_at)`. +async fn create_kb_idx_token_burn_events_executed_at( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_token_burn_events_executed_at", + r#" CREATE INDEX IF NOT EXISTS kb_idx_token_burn_events_executed_at ON kb_token_burn_events (executed_at) - "#, - ]; - for statement in statements { - let execute_result = sqlx::query(statement).execute(pool).await; - if let Err(error) = execute_result { - return Err(crate::KbError::Db(format!( - "cannot initialize sqlite schema statement '{}': {}", - statement, error - ))); - } - } - let schema_version = crate::KbDbMetadataDto::new( - "schema_version".to_string(), - env!("CARGO_PKG_VERSION").to_string(), - ); - let upsert_result = crate::upsert_db_metadata(database, &schema_version).await; - if let Err(error) = upsert_result { - return Err(error); - } - Ok(()) - } + "#, + ) + .await +} + +/// Updates the persisted schema version metadata entry. +async fn update_schema_version_metadata( + database: &crate::KbDatabase, +) -> Result<(), crate::KbError> { + let schema_version = crate::KbDbMetadataDto::new( + "schema_version".to_string(), + env!("CARGO_PKG_VERSION").to_string(), + ); + let upsert_result = crate::upsert_db_metadata(database, &schema_version).await; + match upsert_result { + Ok(_) => Ok(()), + Err(error) => Err(error), } } diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index aa8e66e..4a5c4f2 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -143,3 +143,11 @@ pub use crate::db::list_known_ws_endpoints; pub use crate::db::list_recent_db_runtime_events; pub use crate::db::upsert_known_http_endpoint; pub use crate::db::upsert_known_ws_endpoint; +pub use crate::db::get_dex_by_code; +pub use crate::db::get_pair_by_pool_id; +pub use crate::db::get_pool_by_address; +pub use crate::db::get_pool_listing_by_pool_id; +pub use crate::db::list_pairs; +pub use crate::db::list_pool_listings; +pub use crate::db::list_pool_tokens_by_pool_id; +pub use crate::db::list_pools;