This commit is contained in:
2026-04-25 05:43:13 +02:00
parent 0ad6145091
commit fbd4d5d6ef
12 changed files with 1526 additions and 152 deletions

View File

@@ -21,4 +21,6 @@
0.5.2 - Ajout de la table des tokens observés, de leur statut local et des premières requêtes de persistance associées 0.5.2 - Ajout de la table des tokens observés, de leur statut local et des premières requêtes de persistance associées
0.5.3 - Préparation du stockage local des événements techniques et des signaux utiles à lanalyse, avec distinction runtime / on-chain / métier 0.5.3 - Préparation du stockage local des événements techniques et des signaux utiles à lanalyse, avec distinction runtime / on-chain / métier
0.5.4 - Ajout du modèle métier normalisé initial pour les DEX, tokens, pools, paires, composition des pools et listings 0.5.4 - Ajout du modèle métier normalisé initial pour les DEX, tokens, pools, paires, composition des pools et listings
0.5.5 - Activité métier normalisé 0.5.5 - Ajout des événements métier normalisés pour les swaps, liquidités, mints et burns de tokens
0.5.6 - Consolidation de la couche stockage : activation des foreign keys SQLite, lectures ciblées sur le modèle métier normalisé, index supplémentaires et tests unitaires dédiés

View File

@@ -8,7 +8,7 @@ members = [
] ]
[workspace.package] [workspace.package]
version = "0.5.5" version = "0.5.6"
edition = "2024" edition = "2024"
license = "MIT" license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot"

View File

@@ -365,14 +365,12 @@ Réalisé :
- éviter que la détection technique `0.6.x` écrive directement dans des tables trop brutes ou ambiguës. - éviter que la détection technique `0.6.x` écrive directement dans des tables trop brutes ou ambiguës.
### 6.23. Version `0.5.5` — Activité métier normalisée ### 6.23. Version `0.5.5` — Activité métier normalisée
Objectif : poser les premiers événements métier exploitables. Réalisé :
À faire : - ajout des tables de swaps,
- ajout des événements de liquidité,
- ajouter les tables de swaps, - ajout des événements de mint et burn utiles au suivi des tokens,
- ajouter les événements de liquidité, - préparation de lhistorique métier nécessaire avant larrivée des connecteurs DEX complets.
- ajouter les événements de mint et burn utiles au suivi des tokens,
- préparer lhistorique métier nécessaire avant larrivée des connecteurs DEX complets.
### 6.24. Version `0.5.6` — Consolidation de la couche stockage ### 6.24. Version `0.5.6` — Consolidation de la couche stockage
Objectif : stabiliser le schéma avant la détection technique réelle. Objectif : stabiliser le schéma avant la détection technique réelle.

View File

@@ -83,6 +83,14 @@ pub use crate::db::queries::upsert_swap;
pub use crate::db::queries::upsert_token; pub use crate::db::queries::upsert_token;
pub use crate::db::queries::upsert_token_burn_event; pub use crate::db::queries::upsert_token_burn_event;
pub use crate::db::queries::upsert_token_mint_event; pub use crate::db::queries::upsert_token_mint_event;
pub use crate::db::queries::get_dex_by_code;
pub use crate::db::queries::get_pair_by_pool_id;
pub use crate::db::queries::get_pool_by_address;
pub use crate::db::queries::get_pool_listing_by_pool_id;
pub use crate::db::queries::list_pairs;
pub use crate::db::queries::list_pool_listings;
pub use crate::db::queries::list_pool_tokens_by_pool_id;
pub use crate::db::queries::list_pools;
pub use crate::db::types::KbAnalysisSignalSeverity; pub use crate::db::types::KbAnalysisSignalSeverity;
pub use crate::db::types::KbDatabaseBackend; pub use crate::db::types::KbDatabaseBackend;
pub use crate::db::types::KbDbRuntimeEventLevel; pub use crate::db::types::KbDbRuntimeEventLevel;

View File

@@ -2,6 +2,10 @@
//! Database queries. //! Database queries.
// file: kb_lib/src/db/queries.rs
//! Database queries.
mod analysis_signal; mod analysis_signal;
mod db_metadata; mod db_metadata;
mod db_runtime_event; mod db_runtime_event;
@@ -27,6 +31,7 @@ pub use crate::db::queries::db_metadata::list_db_metadata;
pub use crate::db::queries::db_metadata::upsert_db_metadata; pub use crate::db::queries::db_metadata::upsert_db_metadata;
pub use crate::db::queries::db_runtime_event::insert_db_runtime_event; pub use crate::db::queries::db_runtime_event::insert_db_runtime_event;
pub use crate::db::queries::db_runtime_event::list_recent_db_runtime_events; pub use crate::db::queries::db_runtime_event::list_recent_db_runtime_events;
pub use crate::db::queries::dex::get_dex_by_code;
pub use crate::db::queries::dex::list_dexes; pub use crate::db::queries::dex::list_dexes;
pub use crate::db::queries::dex::upsert_dex; pub use crate::db::queries::dex::upsert_dex;
pub use crate::db::queries::known_http_endpoint::get_known_http_endpoint; pub use crate::db::queries::known_http_endpoint::get_known_http_endpoint;
@@ -42,9 +47,16 @@ pub use crate::db::queries::observed_token::list_observed_tokens;
pub use crate::db::queries::observed_token::upsert_observed_token; pub use crate::db::queries::observed_token::upsert_observed_token;
pub use crate::db::queries::onchain_observation::insert_onchain_observation; pub use crate::db::queries::onchain_observation::insert_onchain_observation;
pub use crate::db::queries::onchain_observation::list_recent_onchain_observations; pub use crate::db::queries::onchain_observation::list_recent_onchain_observations;
pub use crate::db::queries::pair::get_pair_by_pool_id;
pub use crate::db::queries::pair::list_pairs;
pub use crate::db::queries::pair::upsert_pair; pub use crate::db::queries::pair::upsert_pair;
pub use crate::db::queries::pool::get_pool_by_address;
pub use crate::db::queries::pool::list_pools;
pub use crate::db::queries::pool::upsert_pool; pub use crate::db::queries::pool::upsert_pool;
pub use crate::db::queries::pool_listing::get_pool_listing_by_pool_id;
pub use crate::db::queries::pool_listing::list_pool_listings;
pub use crate::db::queries::pool_listing::upsert_pool_listing; pub use crate::db::queries::pool_listing::upsert_pool_listing;
pub use crate::db::queries::pool_token::list_pool_tokens_by_pool_id;
pub use crate::db::queries::pool_token::upsert_pool_token; pub use crate::db::queries::pool_token::upsert_pool_token;
pub use crate::db::queries::swap::list_recent_swaps; pub use crate::db::queries::swap::list_recent_swaps;
pub use crate::db::queries::swap::upsert_swap; pub use crate::db::queries::swap::upsert_swap;

View File

@@ -66,6 +66,55 @@ LIMIT 1
} }
} }
/// Reads one normalized DEX row by code.
pub async fn get_dex_by_code(
database: &crate::KbDatabase,
code: &str,
) -> Result<std::option::Option<crate::KbDexDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbDexEntity>(
r#"
SELECT
id,
code,
name,
program_id,
router_program_id,
is_enabled,
created_at,
updated_at
FROM kb_dexes
WHERE code = ?
LIMIT 1
"#,
)
.bind(code)
.fetch_optional(pool)
.await;
let entity_option = match query_result {
Ok(entity_option) => entity_option,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot read kb_dexes '{}' on sqlite: {}",
code, error
)));
}
};
match entity_option {
Some(entity) => {
let dto_result = crate::KbDexDto::try_from(entity);
match dto_result {
Ok(dto) => Ok(Some(dto)),
Err(error) => Err(error),
}
}
None => Ok(None),
}
}
}
}
/// Lists normalized DEX rows. /// Lists normalized DEX rows.
pub async fn list_dexes( pub async fn list_dexes(
database: &crate::KbDatabase, database: &crate::KbDatabase,
@@ -111,3 +160,49 @@ ORDER BY code ASC
} }
} }
} }
#[cfg(test)]
mod tests {
#[tokio::test]
async fn dex_roundtrip_works() {
let tempdir = tempfile::tempdir().expect("tempdir must succeed");
let database_path = tempdir.path().join("dex_roundtrip.sqlite3");
let config = crate::KbDatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
let database = crate::KbDatabase::connect_and_initialize(&config)
.await
.expect("database init must succeed");
let dex_id = crate::upsert_dex(
&database,
&crate::KbDexDto::new(
"raydium".to_string(),
"Raydium".to_string(),
None,
None,
true,
),
)
.await
.expect("dex upsert must succeed");
assert!(dex_id > 0);
let dex = crate::get_dex_by_code(&database, "raydium")
.await
.expect("get dex must succeed");
assert!(dex.is_some());
assert_eq!(dex.expect("dex must exist").name, "Raydium");
let dexes = crate::list_dexes(&database)
.await
.expect("list dexes must succeed");
assert_eq!(dexes.len(), 1);
}
}

View File

@@ -59,10 +59,202 @@ LIMIT 1
Ok(id) => Ok(id), Ok(id) => Ok(id),
Err(error) => Err(crate::KbError::Db(format!( Err(error) => Err(crate::KbError::Db(format!(
"cannot fetch kb_pairs id for pool_id '{}' on sqlite: {}", "cannot fetch kb_pairs id for pool_id '{}' on sqlite: {}",
dto.pool_id, dto.pool_id, error
error
))), ))),
} }
}, }
}
}
/// Reads one normalized pair row by pool id.
pub async fn get_pair_by_pool_id(
database: &crate::KbDatabase,
pool_id: i64,
) -> Result<std::option::Option<crate::KbPairDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbPairEntity>(
r#"
SELECT
id,
dex_id,
pool_id,
base_token_id,
quote_token_id,
symbol,
first_seen_at,
updated_at
FROM kb_pairs
WHERE pool_id = ?
LIMIT 1
"#,
)
.bind(pool_id)
.fetch_optional(pool)
.await;
let entity_option = match query_result {
Ok(entity_option) => entity_option,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot read kb_pairs by pool_id '{}' on sqlite: {}",
pool_id, error
)));
}
};
match entity_option {
Some(entity) => {
let dto_result = crate::KbPairDto::try_from(entity);
match dto_result {
Ok(dto) => Ok(Some(dto)),
Err(error) => Err(error),
}
}
None => Ok(None),
}
}
}
}
/// Lists normalized pairs ordered by id ascending.
pub async fn list_pairs(
database: &crate::KbDatabase,
) -> Result<std::vec::Vec<crate::KbPairDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbPairEntity>(
r#"
SELECT
id,
dex_id,
pool_id,
base_token_id,
quote_token_id,
symbol,
first_seen_at,
updated_at
FROM kb_pairs
ORDER BY id ASC
"#,
)
.fetch_all(pool)
.await;
let entities = match query_result {
Ok(entities) => entities,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list kb_pairs on sqlite: {}",
error
)));
}
};
let mut dtos = std::vec::Vec::new();
for entity in entities {
let dto_result = crate::KbPairDto::try_from(entity);
let dto = match dto_result {
Ok(dto) => dto,
Err(error) => return Err(error),
};
dtos.push(dto);
}
Ok(dtos)
}
}
}
#[cfg(test)]
mod tests {
#[tokio::test]
async fn pair_roundtrip_works() {
let tempdir = tempfile::tempdir().expect("tempdir must succeed");
let database_path = tempdir.path().join("pair_roundtrip.sqlite3");
let config = crate::KbDatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
let database = crate::KbDatabase::connect_and_initialize(&config)
.await
.expect("database init must succeed");
let dex_id = crate::upsert_dex(
&database,
&crate::KbDexDto::new(
"raydium".to_string(),
"Raydium".to_string(),
None,
None,
true,
),
)
.await
.expect("dex upsert must succeed");
let base_token_id = crate::upsert_token(
&database,
&crate::KbTokenDto::new(
"Base111111111111111111111111111111111111111".to_string(),
Some("BASE".to_string()),
Some("Base Token".to_string()),
Some(6),
crate::SPL_TOKEN_PROGRAM_ID.to_string(),
false,
),
)
.await
.expect("base token upsert must succeed");
let quote_token_id = crate::upsert_token(
&database,
&crate::KbTokenDto::new(
"So11111111111111111111111111111111111111112".to_string(),
Some("WSOL".to_string()),
Some("Wrapped SOL".to_string()),
Some(9),
crate::SPL_TOKEN_PROGRAM_ID.to_string(),
true,
),
)
.await
.expect("quote token upsert must succeed");
let pool_id = crate::upsert_pool(
&database,
&crate::KbPoolDto::new(
dex_id,
"Pool111111111111111111111111111111111111111".to_string(),
crate::KbPoolKind::Amm,
crate::KbPoolStatus::Active,
),
)
.await
.expect("pool upsert must succeed");
let pair_id = crate::upsert_pair(
&database,
&crate::KbPairDto::new(
dex_id,
pool_id,
base_token_id,
quote_token_id,
Some("BASE/WSOL".to_string()),
),
)
.await
.expect("pair upsert must succeed");
assert!(pair_id > 0);
let pair = crate::get_pair_by_pool_id(&database, pool_id)
.await
.expect("get pair must succeed");
assert!(pair.is_some());
assert_eq!(
pair.expect("pair must exist").symbol.as_deref(),
Some("BASE/WSOL")
);
let pairs = crate::list_pairs(&database)
.await
.expect("list pairs must succeed");
assert_eq!(pairs.len(), 1);
} }
} }

View File

@@ -62,3 +62,157 @@ LIMIT 1
} }
} }
} }
/// Reads one normalized pool row by address.
pub async fn get_pool_by_address(
database: &crate::KbDatabase,
address: &str,
) -> Result<std::option::Option<crate::KbPoolDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbPoolEntity>(
r#"
SELECT
id,
dex_id,
address,
pool_kind,
status,
first_seen_at,
updated_at
FROM kb_pools
WHERE address = ?
LIMIT 1
"#,
)
.bind(address)
.fetch_optional(pool)
.await;
let entity_option = match query_result {
Ok(entity_option) => entity_option,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot read kb_pools '{}' on sqlite: {}",
address, error
)));
}
};
match entity_option {
Some(entity) => {
let dto_result = crate::KbPoolDto::try_from(entity);
match dto_result {
Ok(dto) => Ok(Some(dto)),
Err(error) => Err(error),
}
}
None => Ok(None),
}
}
}
}
/// Lists normalized pools ordered by id ascending.
pub async fn list_pools(
database: &crate::KbDatabase,
) -> Result<std::vec::Vec<crate::KbPoolDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbPoolEntity>(
r#"
SELECT
id,
dex_id,
address,
pool_kind,
status,
first_seen_at,
updated_at
FROM kb_pools
ORDER BY id ASC
"#,
)
.fetch_all(pool)
.await;
let entities = match query_result {
Ok(entities) => entities,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list kb_pools on sqlite: {}",
error
)));
}
};
let mut dtos = std::vec::Vec::new();
for entity in entities {
let dto_result = crate::KbPoolDto::try_from(entity);
let dto = match dto_result {
Ok(dto) => dto,
Err(error) => return Err(error),
};
dtos.push(dto);
}
Ok(dtos)
}
}
}
#[cfg(test)]
mod tests {
#[tokio::test]
async fn pool_roundtrip_works() {
let tempdir = tempfile::tempdir().expect("tempdir must succeed");
let database_path = tempdir.path().join("pool_roundtrip.sqlite3");
let config = crate::KbDatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
let database = crate::KbDatabase::connect_and_initialize(&config)
.await
.expect("database init must succeed");
let dex_id = crate::upsert_dex(
&database,
&crate::KbDexDto::new(
"raydium".to_string(),
"Raydium".to_string(),
None,
None,
true,
),
)
.await
.expect("dex upsert must succeed");
let pool_id = crate::upsert_pool(
&database,
&crate::KbPoolDto::new(
dex_id,
"Pool111111111111111111111111111111111111111".to_string(),
crate::KbPoolKind::Amm,
crate::KbPoolStatus::Active,
),
)
.await
.expect("pool upsert must succeed");
assert!(pool_id > 0);
let pool =
crate::get_pool_by_address(&database, "Pool111111111111111111111111111111111111111")
.await
.expect("get pool must succeed");
assert!(pool.is_some());
assert_eq!(
pool.expect("pool must exist").pool_kind,
crate::KbPoolKind::Amm
);
let pools = crate::list_pools(&database)
.await
.expect("list pools must succeed");
assert_eq!(pools.len(), 1);
}
}

View File

@@ -76,12 +76,113 @@ LIMIT 1
} }
} }
/// Reads one normalized pool listing row by pool id.
pub async fn get_pool_listing_by_pool_id(
database: &crate::KbDatabase,
pool_id: i64,
) -> Result<std::option::Option<crate::KbPoolListingDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbPoolListingEntity>(
r#"
SELECT
id,
dex_id,
pool_id,
pair_id,
source_kind,
source_endpoint_name,
detected_at,
initial_base_reserve,
initial_quote_reserve,
initial_price_quote,
updated_at
FROM kb_pool_listings
WHERE pool_id = ?
LIMIT 1
"#,
)
.bind(pool_id)
.fetch_optional(pool)
.await;
let entity_option = match query_result {
Ok(entity_option) => entity_option,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot read kb_pool_listings by pool_id '{}' on sqlite: {}",
pool_id, error
)));
}
};
match entity_option {
Some(entity) => {
let dto_result = crate::KbPoolListingDto::try_from(entity);
match dto_result {
Ok(dto) => Ok(Some(dto)),
Err(error) => Err(error),
}
}
None => Ok(None),
}
}
}
}
/// Lists normalized pool listings ordered by detected date then id.
pub async fn list_pool_listings(
database: &crate::KbDatabase,
) -> Result<std::vec::Vec<crate::KbPoolListingDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbPoolListingEntity>(
r#"
SELECT
id,
dex_id,
pool_id,
pair_id,
source_kind,
source_endpoint_name,
detected_at,
initial_base_reserve,
initial_quote_reserve,
initial_price_quote,
updated_at
FROM kb_pool_listings
ORDER BY detected_at ASC, id ASC
"#,
)
.fetch_all(pool)
.await;
let entities = match query_result {
Ok(entities) => entities,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list kb_pool_listings on sqlite: {}",
error
)));
}
};
let mut dtos = std::vec::Vec::new();
for entity in entities {
let dto_result = crate::KbPoolListingDto::try_from(entity);
let dto = match dto_result {
Ok(dto) => dto,
Err(error) => return Err(error),
};
dtos.push(dto);
}
Ok(dtos)
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#[tokio::test] #[tokio::test]
async fn normalized_model_roundtrip_works() { async fn pool_listing_roundtrip_works() {
let tempdir = tempfile::tempdir().expect("tempdir must succeed"); let tempdir = tempfile::tempdir().expect("tempdir must succeed");
let database_path = tempdir.path().join("normalized_model.sqlite3"); let database_path = tempdir.path().join("pool_listing_roundtrip.sqlite3");
let config = crate::KbDatabaseConfig { let config = crate::KbDatabaseConfig {
enabled: true, enabled: true,
backend: crate::KbDatabaseBackend::Sqlite, backend: crate::KbDatabaseBackend::Sqlite,
@@ -112,12 +213,12 @@ mod tests {
let base_token_id = crate::upsert_token( let base_token_id = crate::upsert_token(
&database, &database,
&crate::KbTokenDto::new( &crate::KbTokenDto::new(
"So11111111111111111111111111111111111111112".to_string(), "Base111111111111111111111111111111111111111".to_string(),
Some("WSOL".to_string()), Some("BASE".to_string()),
Some("Wrapped SOL".to_string()), Some("Base Token".to_string()),
Some(9), Some(6),
crate::SPL_TOKEN_PROGRAM_ID.to_string(), crate::SPL_TOKEN_PROGRAM_ID.to_string(),
true, false,
), ),
) )
.await .await
@@ -125,12 +226,12 @@ mod tests {
let quote_token_id = crate::upsert_token( let quote_token_id = crate::upsert_token(
&database, &database,
&crate::KbTokenDto::new( &crate::KbTokenDto::new(
"DezX111111111111111111111111111111111111111".to_string(), "So11111111111111111111111111111111111111112".to_string(),
Some("TEST".to_string()), Some("WSOL".to_string()),
Some("Test Token".to_string()), Some("Wrapped SOL".to_string()),
Some(6), Some(9),
crate::SPL_TOKEN_PROGRAM_ID.to_string(), crate::SPL_TOKEN_PROGRAM_ID.to_string(),
false, true,
), ),
) )
.await .await
@@ -151,38 +252,14 @@ mod tests {
&crate::KbPairDto::new( &crate::KbPairDto::new(
dex_id, dex_id,
pool_id, pool_id,
quote_token_id,
base_token_id, base_token_id,
Some("TEST/WSOL".to_string()), quote_token_id,
Some("BASE/WSOL".to_string()),
), ),
) )
.await .await
.expect("pair upsert must succeed"); .expect("pair upsert must succeed");
let _pool_token_a_id = crate::upsert_pool_token( let pool_listing_id = crate::upsert_pool_listing(
&database,
&crate::KbPoolTokenDto::new(
pool_id,
quote_token_id,
crate::KbPoolTokenRole::Base,
None,
Some(0),
),
)
.await
.expect("pool token a upsert must succeed");
let _pool_token_b_id = crate::upsert_pool_token(
&database,
&crate::KbPoolTokenDto::new(
pool_id,
base_token_id,
crate::KbPoolTokenRole::Quote,
None,
Some(1),
),
)
.await
.expect("pool token b upsert must succeed");
let listing_id = crate::upsert_pool_listing(
&database, &database,
&crate::KbPoolListingDto::new( &crate::KbPoolListingDto::new(
dex_id, dex_id,
@@ -191,21 +268,24 @@ mod tests {
crate::KbObservationSourceKind::WsRpc, crate::KbObservationSourceKind::WsRpc,
Some("mainnet_public_ws_slots".to_string()), Some("mainnet_public_ws_slots".to_string()),
Some(1000.0), Some(1000.0),
Some(42.0), Some(10.0),
Some(0.042), Some(0.01),
), ),
) )
.await .await
.expect("pool listing upsert must succeed"); .expect("pool listing upsert must succeed");
assert!(listing_id > 0); assert!(pool_listing_id > 0);
let dexes = crate::list_dexes(&database) let pool_listing = crate::get_pool_listing_by_pool_id(&database, pool_id)
.await .await
.expect("dex list must succeed"); .expect("get pool listing must succeed");
assert_eq!(dexes.len(), 1); assert!(pool_listing.is_some());
let token = assert_eq!(
crate::get_token_by_mint(&database, "So11111111111111111111111111111111111111112") pool_listing.expect("pool listing must exist").source_kind,
.await crate::KbObservationSourceKind::WsRpc
.expect("token get must succeed"); );
assert!(token.is_some()); let pool_listings = crate::list_pool_listings(&database)
.await
.expect("list pool listings must succeed");
assert_eq!(pool_listings.len(), 1);
} }
} }

View File

@@ -62,6 +62,133 @@ LIMIT 1
error error
))), ))),
} }
}, }
}
}
/// Lists normalized pool token rows by pool id.
pub async fn list_pool_tokens_by_pool_id(
database: &crate::KbDatabase,
pool_id: i64,
) -> Result<std::vec::Vec<crate::KbPoolTokenDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbPoolTokenEntity>(
r#"
SELECT
id,
pool_id,
token_id,
role,
vault_address,
token_order,
created_at,
updated_at
FROM kb_pool_tokens
WHERE pool_id = ?
ORDER BY token_order ASC, id ASC
"#,
)
.bind(pool_id)
.fetch_all(pool)
.await;
let entities = match query_result {
Ok(entities) => entities,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list kb_pool_tokens for pool_id '{}' on sqlite: {}",
pool_id, error
)));
}
};
let mut dtos = std::vec::Vec::new();
for entity in entities {
let dto_result = crate::KbPoolTokenDto::try_from(entity);
let dto = match dto_result {
Ok(dto) => dto,
Err(error) => return Err(error),
};
dtos.push(dto);
}
Ok(dtos)
}
}
}
#[cfg(test)]
mod tests {
#[tokio::test]
async fn pool_token_roundtrip_works() {
let tempdir = tempfile::tempdir().expect("tempdir must succeed");
let database_path = tempdir.path().join("pool_token_roundtrip.sqlite3");
let config = crate::KbDatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
let database = crate::KbDatabase::connect_and_initialize(&config)
.await
.expect("database init must succeed");
let dex_id = crate::upsert_dex(
&database,
&crate::KbDexDto::new(
"raydium".to_string(),
"Raydium".to_string(),
None,
None,
true,
),
)
.await
.expect("dex upsert must succeed");
let token_id = crate::upsert_token(
&database,
&crate::KbTokenDto::new(
"Base111111111111111111111111111111111111111".to_string(),
Some("BASE".to_string()),
Some("Base Token".to_string()),
Some(6),
crate::SPL_TOKEN_PROGRAM_ID.to_string(),
false,
),
)
.await
.expect("token upsert must succeed");
let pool_id = crate::upsert_pool(
&database,
&crate::KbPoolDto::new(
dex_id,
"Pool111111111111111111111111111111111111111".to_string(),
crate::KbPoolKind::Amm,
crate::KbPoolStatus::Active,
),
)
.await
.expect("pool upsert must succeed");
let pool_token_id = crate::upsert_pool_token(
&database,
&crate::KbPoolTokenDto::new(
pool_id,
token_id,
crate::KbPoolTokenRole::Base,
Some("Vault111111111111111111111111111111111111".to_string()),
Some(0),
),
)
.await
.expect("pool token upsert must succeed");
assert!(pool_token_id > 0);
let pool_tokens = crate::list_pool_tokens_by_pool_id(&database, pool_id)
.await
.expect("list pool tokens must succeed");
assert_eq!(pool_tokens.len(), 1);
assert_eq!(pool_tokens[0].role, crate::KbPoolTokenRole::Base);
} }
} }

File diff suppressed because it is too large Load Diff

View File

@@ -143,3 +143,11 @@ pub use crate::db::list_known_ws_endpoints;
pub use crate::db::list_recent_db_runtime_events; pub use crate::db::list_recent_db_runtime_events;
pub use crate::db::upsert_known_http_endpoint; pub use crate::db::upsert_known_http_endpoint;
pub use crate::db::upsert_known_ws_endpoint; pub use crate::db::upsert_known_ws_endpoint;
pub use crate::db::get_dex_by_code;
pub use crate::db::get_pair_by_pool_id;
pub use crate::db::get_pool_by_address;
pub use crate::db::get_pool_listing_by_pool_id;
pub use crate::db::list_pairs;
pub use crate::db::list_pool_listings;
pub use crate::db::list_pool_tokens_by_pool_id;
pub use crate::db::list_pools;