diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fce33b..d089f00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,3 +21,4 @@ 0.5.2 - Ajout de la table des tokens observés, de leur statut local et des premières requêtes de persistance associées 0.5.3 - Préparation du stockage local des événements techniques et des signaux utiles à l’analyse, avec distinction runtime / on-chain / métier 0.5.4 - Ajout du modèle métier normalisé initial pour les DEX, tokens, pools, paires, composition des pools et listings +0.5.5 - Activité métier normalisé diff --git a/Cargo.toml b/Cargo.toml index b5f19b9..d159dd1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.5.4" +version = "0.5.5" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/kb_lib/src/db.rs b/kb_lib/src/db.rs index 0917a90..cb2a21f 100644 --- a/kb_lib/src/db.rs +++ b/kb_lib/src/db.rs @@ -21,26 +21,34 @@ pub use crate::db::dtos::KbDbRuntimeEventDto; pub use crate::db::dtos::KbDexDto; pub use crate::db::dtos::KbKnownHttpEndpointDto; pub use crate::db::dtos::KbKnownWsEndpointDto; +pub use crate::db::dtos::KbLiquidityEventDto; pub use crate::db::dtos::KbObservedTokenDto; pub use crate::db::dtos::KbOnchainObservationDto; pub use crate::db::dtos::KbPairDto; pub use crate::db::dtos::KbPoolDto; pub use crate::db::dtos::KbPoolListingDto; pub use crate::db::dtos::KbPoolTokenDto; +pub use crate::db::dtos::KbSwapDto; +pub use crate::db::dtos::KbTokenBurnEventDto; pub use crate::db::dtos::KbTokenDto; +pub use crate::db::dtos::KbTokenMintEventDto; pub use crate::db::entities::KbAnalysisSignalEntity; pub use crate::db::entities::KbDbMetadataEntity; pub use crate::db::entities::KbDbRuntimeEventEntity; pub use crate::db::entities::KbDexEntity; pub use crate::db::entities::KbKnownHttpEndpointEntity; pub use crate::db::entities::KbKnownWsEndpointEntity; +pub use crate::db::entities::KbLiquidityEventEntity; pub use crate::db::entities::KbObservedTokenEntity; pub use crate::db::entities::KbOnchainObservationEntity; pub use crate::db::entities::KbPairEntity; pub use crate::db::entities::KbPoolEntity; pub use crate::db::entities::KbPoolListingEntity; pub use crate::db::entities::KbPoolTokenEntity; +pub use crate::db::entities::KbSwapEntity; +pub use crate::db::entities::KbTokenBurnEventEntity; pub use crate::db::entities::KbTokenEntity; +pub use crate::db::entities::KbTokenMintEventEntity; pub use crate::db::queries::get_db_metadata; pub use crate::db::queries::get_known_http_endpoint; pub use crate::db::queries::get_known_ws_endpoint; @@ -56,22 +64,32 @@ pub use crate::db::queries::list_known_ws_endpoints; pub use crate::db::queries::list_observed_tokens; pub use crate::db::queries::list_recent_analysis_signals; pub use crate::db::queries::list_recent_db_runtime_events; +pub use crate::db::queries::list_recent_liquidity_events; pub use crate::db::queries::list_recent_onchain_observations; +pub use crate::db::queries::list_recent_swaps; +pub use crate::db::queries::list_recent_token_burn_events; +pub use crate::db::queries::list_recent_token_mint_events; pub use crate::db::queries::upsert_db_metadata; pub use crate::db::queries::upsert_dex; pub use crate::db::queries::upsert_known_http_endpoint; pub use crate::db::queries::upsert_known_ws_endpoint; +pub use crate::db::queries::upsert_liquidity_event; pub use crate::db::queries::upsert_observed_token; pub use crate::db::queries::upsert_pair; pub use crate::db::queries::upsert_pool; pub use crate::db::queries::upsert_pool_listing; pub use crate::db::queries::upsert_pool_token; +pub use crate::db::queries::upsert_swap; pub use crate::db::queries::upsert_token; +pub use crate::db::queries::upsert_token_burn_event; +pub use crate::db::queries::upsert_token_mint_event; pub use crate::db::types::KbAnalysisSignalSeverity; pub use crate::db::types::KbDatabaseBackend; pub use crate::db::types::KbDbRuntimeEventLevel; +pub use crate::db::types::KbLiquidityEventKind; pub use crate::db::types::KbObservationSourceKind; pub use crate::db::types::KbObservedTokenStatus; pub use crate::db::types::KbPoolKind; pub use crate::db::types::KbPoolStatus; pub use crate::db::types::KbPoolTokenRole; +pub use crate::db::types::KbSwapTradeSide; diff --git a/kb_lib/src/db/dtos.rs b/kb_lib/src/db/dtos.rs index 3ae4628..5f5e82c 100644 --- a/kb_lib/src/db/dtos.rs +++ b/kb_lib/src/db/dtos.rs @@ -8,13 +8,17 @@ mod db_runtime_event; mod dex; mod known_http_endpoint; mod known_ws_endpoint; +mod liquidity_event; mod observed_token; mod onchain_observation; mod pair; mod pool; mod pool_listing; mod pool_token; +mod swap; mod token; +mod token_burn_event; +mod token_mint_event; pub use crate::db::dtos::analysis_signal::KbAnalysisSignalDto; pub use crate::db::dtos::db_metadata::KbDbMetadataDto; @@ -22,10 +26,14 @@ pub use crate::db::dtos::db_runtime_event::KbDbRuntimeEventDto; pub use crate::db::dtos::dex::KbDexDto; pub use crate::db::dtos::known_http_endpoint::KbKnownHttpEndpointDto; pub use crate::db::dtos::known_ws_endpoint::KbKnownWsEndpointDto; +pub use crate::db::dtos::liquidity_event::KbLiquidityEventDto; pub use crate::db::dtos::observed_token::KbObservedTokenDto; pub use crate::db::dtos::onchain_observation::KbOnchainObservationDto; pub use crate::db::dtos::pair::KbPairDto; pub use crate::db::dtos::pool::KbPoolDto; pub use crate::db::dtos::pool_listing::KbPoolListingDto; pub use crate::db::dtos::pool_token::KbPoolTokenDto; +pub use crate::db::dtos::swap::KbSwapDto; pub use crate::db::dtos::token::KbTokenDto; +pub use crate::db::dtos::token_burn_event::KbTokenBurnEventDto; +pub use crate::db::dtos::token_mint_event::KbTokenMintEventDto; diff --git a/kb_lib/src/db/dtos/liquidity_event.rs b/kb_lib/src/db/dtos/liquidity_event.rs new file mode 100644 index 0000000..4b1929f --- /dev/null +++ b/kb_lib/src/db/dtos/liquidity_event.rs @@ -0,0 +1,134 @@ +// file: kb_lib/src/db/dtos/liquidity_event.rs + +//! Liquidity event DTO. + +/// Application-facing normalized liquidity event DTO. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbLiquidityEventDto { + /// Optional numeric primary key. + pub id: std::option::Option, + /// Related DEX id. + pub dex_id: i64, + /// Related pool id. + pub pool_id: i64, + /// Optional related pair id. + pub pair_id: std::option::Option, + /// Transaction signature. + pub signature: std::string::String, + /// Instruction index inside the transaction. + pub instruction_index: i64, + /// Optional slot number. + pub slot: std::option::Option, + /// Liquidity event kind. + pub event_kind: crate::KbLiquidityEventKind, + /// Optional actor wallet. + pub actor_wallet: std::option::Option, + /// Base token id. + pub base_token_id: i64, + /// Quote token id. + pub quote_token_id: i64, + /// Optional LP token id. + pub lp_token_id: std::option::Option, + /// Base amount as decimal text. + pub base_amount: std::string::String, + /// Quote amount as decimal text. + pub quote_amount: std::string::String, + /// Optional LP amount as decimal text. + pub lp_amount: std::option::Option, + /// Execution timestamp. + pub executed_at: chrono::DateTime, +} + +impl KbLiquidityEventDto { + /// Creates a new liquidity event DTO. + pub fn new( + dex_id: i64, + pool_id: i64, + pair_id: std::option::Option, + signature: std::string::String, + instruction_index: i64, + slot: std::option::Option, + event_kind: crate::KbLiquidityEventKind, + actor_wallet: std::option::Option, + base_token_id: i64, + quote_token_id: i64, + lp_token_id: std::option::Option, + base_amount: std::string::String, + quote_amount: std::string::String, + lp_amount: std::option::Option, + ) -> Self { + Self { + id: None, + dex_id, + pool_id, + pair_id, + signature, + instruction_index, + slot, + event_kind, + actor_wallet, + base_token_id, + quote_token_id, + lp_token_id, + base_amount, + quote_amount, + lp_amount, + executed_at: chrono::Utc::now(), + } + } +} + +impl TryFrom for KbLiquidityEventDto { + type Error = crate::KbError; + + fn try_from(entity: crate::KbLiquidityEventEntity) -> Result { + let event_kind_result = crate::KbLiquidityEventKind::from_i16(entity.event_kind); + let event_kind = match event_kind_result { + Ok(event_kind) => event_kind, + Err(error) => return Err(error), + }; + let executed_at_result = chrono::DateTime::parse_from_rfc3339(&entity.executed_at); + let executed_at = match executed_at_result { + Ok(executed_at) => executed_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse liquidity event executed_at '{}': {}", + entity.executed_at, error + ))); + } + }; + let slot = match entity.slot { + Some(slot) => { + let slot_result = u64::try_from(slot); + match slot_result { + Ok(slot) => Some(slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert liquidity event slot '{}' to u64: {}", + slot, error + ))); + } + } + } + None => None, + }; + Ok(Self { + id: Some(entity.id), + dex_id: entity.dex_id, + pool_id: entity.pool_id, + pair_id: entity.pair_id, + signature: entity.signature, + instruction_index: entity.instruction_index, + slot, + event_kind, + actor_wallet: entity.actor_wallet, + base_token_id: entity.base_token_id, + quote_token_id: entity.quote_token_id, + lp_token_id: entity.lp_token_id, + base_amount: entity.base_amount, + quote_amount: entity.quote_amount, + lp_amount: entity.lp_amount, + executed_at, + }) + } +} diff --git a/kb_lib/src/db/dtos/swap.rs b/kb_lib/src/db/dtos/swap.rs new file mode 100644 index 0000000..bbc0718 --- /dev/null +++ b/kb_lib/src/db/dtos/swap.rs @@ -0,0 +1,129 @@ +// file: kb_lib/src/db/dtos/swap.rs + +//! Swap DTO. + +/// Application-facing normalized swap DTO. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbSwapDto { + /// Optional numeric primary key. + pub id: std::option::Option, + /// Related DEX id. + pub dex_id: i64, + /// Related pool id. + pub pool_id: i64, + /// Optional related pair id. + pub pair_id: std::option::Option, + /// Transaction signature. + pub signature: std::string::String, + /// Instruction index inside the transaction. + pub instruction_index: i64, + /// Optional slot number. + pub slot: std::option::Option, + /// Optional trader wallet. + pub trader_wallet: std::option::Option, + /// Base token id. + pub base_token_id: i64, + /// Quote token id. + pub quote_token_id: i64, + /// Base amount as decimal text. + pub base_amount: std::string::String, + /// Quote amount as decimal text. + pub quote_amount: std::string::String, + /// Optional price in quote units as decimal text. + pub price_quote: std::option::Option, + /// Trade side relative to the base token. + pub trade_side: crate::KbSwapTradeSide, + /// Execution timestamp. + pub executed_at: chrono::DateTime, +} + +impl KbSwapDto { + /// Creates a new swap DTO. + pub fn new( + dex_id: i64, + pool_id: i64, + pair_id: std::option::Option, + signature: std::string::String, + instruction_index: i64, + slot: std::option::Option, + trader_wallet: std::option::Option, + base_token_id: i64, + quote_token_id: i64, + base_amount: std::string::String, + quote_amount: std::string::String, + price_quote: std::option::Option, + trade_side: crate::KbSwapTradeSide, + ) -> Self { + Self { + id: None, + dex_id, + pool_id, + pair_id, + signature, + instruction_index, + slot, + trader_wallet, + base_token_id, + quote_token_id, + base_amount, + quote_amount, + price_quote, + trade_side, + executed_at: chrono::Utc::now(), + } + } +} + +impl TryFrom for KbSwapDto { + type Error = crate::KbError; + + fn try_from(entity: crate::KbSwapEntity) -> Result { + let trade_side_result = crate::KbSwapTradeSide::from_i16(entity.trade_side); + let trade_side = match trade_side_result { + Ok(trade_side) => trade_side, + Err(error) => return Err(error), + }; + let executed_at_result = chrono::DateTime::parse_from_rfc3339(&entity.executed_at); + let executed_at = match executed_at_result { + Ok(executed_at) => executed_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse swap executed_at '{}': {}", + entity.executed_at, error + ))); + } + }; + let slot = match entity.slot { + Some(slot) => { + let slot_result = u64::try_from(slot); + match slot_result { + Ok(slot) => Some(slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert swap slot '{}' to u64: {}", + slot, error + ))); + } + } + } + None => None, + }; + Ok(Self { + id: Some(entity.id), + dex_id: entity.dex_id, + pool_id: entity.pool_id, + pair_id: entity.pair_id, + signature: entity.signature, + instruction_index: entity.instruction_index, + slot, + trader_wallet: entity.trader_wallet, + base_token_id: entity.base_token_id, + quote_token_id: entity.quote_token_id, + base_amount: entity.base_amount, + quote_amount: entity.quote_amount, + price_quote: entity.price_quote, + trade_side, + executed_at, + }) + } +} diff --git a/kb_lib/src/db/dtos/token_burn_event.rs b/kb_lib/src/db/dtos/token_burn_event.rs new file mode 100644 index 0000000..7f5ade5 --- /dev/null +++ b/kb_lib/src/db/dtos/token_burn_event.rs @@ -0,0 +1,103 @@ +// file: kb_lib/src/db/dtos/token_burn_event.rs + +//! Token burn event DTO. + +/// Application-facing normalized token burn event DTO. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbTokenBurnEventDto { + /// Optional numeric primary key. + pub id: std::option::Option, + /// Related token id. + pub token_id: i64, + /// Transaction signature. + pub signature: std::string::String, + /// Instruction index inside the transaction. + pub instruction_index: i64, + /// Optional slot number. + pub slot: std::option::Option, + /// Optional authority wallet. + pub authority_wallet: std::option::Option, + /// Optional source wallet. + pub source_wallet: std::option::Option, + /// Burned amount as decimal text. + pub amount: std::string::String, + /// Optional supply after burn as decimal text. + pub supply_after: std::option::Option, + /// Execution timestamp. + pub executed_at: chrono::DateTime, +} + +impl KbTokenBurnEventDto { + /// Creates a new token burn event DTO. + pub fn new( + token_id: i64, + signature: std::string::String, + instruction_index: i64, + slot: std::option::Option, + authority_wallet: std::option::Option, + source_wallet: std::option::Option, + amount: std::string::String, + supply_after: std::option::Option, + ) -> Self { + Self { + id: None, + token_id, + signature, + instruction_index, + slot, + authority_wallet, + source_wallet, + amount, + supply_after, + executed_at: chrono::Utc::now(), + } + } +} + +impl TryFrom for KbTokenBurnEventDto { + type Error = crate::KbError; + + fn try_from( + entity: crate::KbTokenBurnEventEntity, + ) -> Result { + let executed_at_result = chrono::DateTime::parse_from_rfc3339(&entity.executed_at); + let executed_at = match executed_at_result { + Ok(executed_at) => executed_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse token burn event executed_at '{}': {}", + entity.executed_at, + error + ))); + }, + }; + let slot = match entity.slot { + Some(slot) => { + let slot_result = u64::try_from(slot); + match slot_result { + Ok(slot) => Some(slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert token burn event slot '{}' to u64: {}", + slot, + error + ))); + }, + } + }, + None => None, + }; + Ok(Self { + id: Some(entity.id), + token_id: entity.token_id, + signature: entity.signature, + instruction_index: entity.instruction_index, + slot, + authority_wallet: entity.authority_wallet, + source_wallet: entity.source_wallet, + amount: entity.amount, + supply_after: entity.supply_after, + executed_at, + }) + } +} diff --git a/kb_lib/src/db/dtos/token_mint_event.rs b/kb_lib/src/db/dtos/token_mint_event.rs new file mode 100644 index 0000000..a45b218 --- /dev/null +++ b/kb_lib/src/db/dtos/token_mint_event.rs @@ -0,0 +1,99 @@ +// file: kb_lib/src/db/dtos/token_mint_event.rs + +//! Token mint event DTO. + +/// Application-facing normalized token mint event DTO. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbTokenMintEventDto { + /// Optional numeric primary key. + pub id: std::option::Option, + /// Related token id. + pub token_id: i64, + /// Transaction signature. + pub signature: std::string::String, + /// Instruction index inside the transaction. + pub instruction_index: i64, + /// Optional slot number. + pub slot: std::option::Option, + /// Optional mint authority wallet. + pub authority_wallet: std::option::Option, + /// Optional destination wallet. + pub destination_wallet: std::option::Option, + /// Minted amount as decimal text. + pub amount: std::string::String, + /// Optional supply after mint as decimal text. + pub supply_after: std::option::Option, + /// Execution timestamp. + pub executed_at: chrono::DateTime, +} + +impl KbTokenMintEventDto { + /// Creates a new token mint event DTO. + pub fn new( + token_id: i64, + signature: std::string::String, + instruction_index: i64, + slot: std::option::Option, + authority_wallet: std::option::Option, + destination_wallet: std::option::Option, + amount: std::string::String, + supply_after: std::option::Option, + ) -> Self { + Self { + id: None, + token_id, + signature, + instruction_index, + slot, + authority_wallet, + destination_wallet, + amount, + supply_after, + executed_at: chrono::Utc::now(), + } + } +} + +impl TryFrom for KbTokenMintEventDto { + type Error = crate::KbError; + + fn try_from(entity: crate::KbTokenMintEventEntity) -> Result { + let executed_at_result = chrono::DateTime::parse_from_rfc3339(&entity.executed_at); + let executed_at = match executed_at_result { + Ok(executed_at) => executed_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse token mint event executed_at '{}': {}", + entity.executed_at, error + ))); + } + }; + let slot = match entity.slot { + Some(slot) => { + let slot_result = u64::try_from(slot); + match slot_result { + Ok(slot) => Some(slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert token mint event slot '{}' to u64: {}", + slot, error + ))); + } + } + } + None => None, + }; + Ok(Self { + id: Some(entity.id), + token_id: entity.token_id, + signature: entity.signature, + instruction_index: entity.instruction_index, + slot, + authority_wallet: entity.authority_wallet, + destination_wallet: entity.destination_wallet, + amount: entity.amount, + supply_after: entity.supply_after, + executed_at, + }) + } +} diff --git a/kb_lib/src/db/entities.rs b/kb_lib/src/db/entities.rs index 0cd5ee5..da4a988 100644 --- a/kb_lib/src/db/entities.rs +++ b/kb_lib/src/db/entities.rs @@ -10,13 +10,17 @@ mod db_runtime_event; mod dex; mod known_http_endpoint; mod known_ws_endpoint; +mod liquidity_event; mod observed_token; mod onchain_observation; mod pair; mod pool; mod pool_listing; mod pool_token; +mod swap; mod token; +mod token_burn_event; +mod token_mint_event; pub use crate::db::entities::analysis_signal::KbAnalysisSignalEntity; pub use crate::db::entities::db_metadata::KbDbMetadataEntity; @@ -24,10 +28,14 @@ pub use crate::db::entities::db_runtime_event::KbDbRuntimeEventEntity; pub use crate::db::entities::dex::KbDexEntity; pub use crate::db::entities::known_http_endpoint::KbKnownHttpEndpointEntity; pub use crate::db::entities::known_ws_endpoint::KbKnownWsEndpointEntity; +pub use crate::db::entities::liquidity_event::KbLiquidityEventEntity; pub use crate::db::entities::observed_token::KbObservedTokenEntity; pub use crate::db::entities::onchain_observation::KbOnchainObservationEntity; pub use crate::db::entities::pair::KbPairEntity; pub use crate::db::entities::pool::KbPoolEntity; pub use crate::db::entities::pool_listing::KbPoolListingEntity; pub use crate::db::entities::pool_token::KbPoolTokenEntity; +pub use crate::db::entities::swap::KbSwapEntity; pub use crate::db::entities::token::KbTokenEntity; +pub use crate::db::entities::token_burn_event::KbTokenBurnEventEntity; +pub use crate::db::entities::token_mint_event::KbTokenMintEventEntity; diff --git a/kb_lib/src/db/entities/liquidity_event.rs b/kb_lib/src/db/entities/liquidity_event.rs new file mode 100644 index 0000000..936cbf5 --- /dev/null +++ b/kb_lib/src/db/entities/liquidity_event.rs @@ -0,0 +1,40 @@ +// file: kb_lib/src/db/entities/liquidity_event.rs + +//! Liquidity event entity. + +/// Persisted normalized liquidity event row. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)] +pub struct KbLiquidityEventEntity { + /// Numeric primary key. + pub id: i64, + /// Related DEX id. + pub dex_id: i64, + /// Related pool id. + pub pool_id: i64, + /// Optional related pair id. + pub pair_id: std::option::Option, + /// Transaction signature. + pub signature: std::string::String, + /// Instruction index inside the transaction. + pub instruction_index: i64, + /// Optional slot number. + pub slot: std::option::Option, + /// Event kind stored as stable integer. + pub event_kind: i16, + /// Optional actor wallet. + pub actor_wallet: std::option::Option, + /// Base token id. + pub base_token_id: i64, + /// Quote token id. + pub quote_token_id: i64, + /// Optional LP token id. + pub lp_token_id: std::option::Option, + /// Base amount as decimal text. + pub base_amount: std::string::String, + /// Quote amount as decimal text. + pub quote_amount: std::string::String, + /// Optional LP amount as decimal text. + pub lp_amount: std::option::Option, + /// Execution timestamp encoded as RFC3339 UTC text. + pub executed_at: std::string::String, +} diff --git a/kb_lib/src/db/entities/swap.rs b/kb_lib/src/db/entities/swap.rs new file mode 100644 index 0000000..a948ed6 --- /dev/null +++ b/kb_lib/src/db/entities/swap.rs @@ -0,0 +1,38 @@ +// file: kb_lib/src/db/entities/swap.rs + +//! Swap entity. + +/// Persisted normalized swap row. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)] +pub struct KbSwapEntity { + /// Numeric primary key. + pub id: i64, + /// Related DEX id. + pub dex_id: i64, + /// Related pool id. + pub pool_id: i64, + /// Optional related pair id. + pub pair_id: std::option::Option, + /// Transaction signature. + pub signature: std::string::String, + /// Instruction index inside the transaction. + pub instruction_index: i64, + /// Optional slot number. + pub slot: std::option::Option, + /// Optional trader wallet. + pub trader_wallet: std::option::Option, + /// Base token id. + pub base_token_id: i64, + /// Quote token id. + pub quote_token_id: i64, + /// Base amount as decimal text. + pub base_amount: std::string::String, + /// Quote amount as decimal text. + pub quote_amount: std::string::String, + /// Optional price in quote units as decimal text. + pub price_quote: std::option::Option, + /// Trade side stored as stable integer. + pub trade_side: i16, + /// Execution timestamp encoded as RFC3339 UTC text. + pub executed_at: std::string::String, +} diff --git a/kb_lib/src/db/entities/token_burn_event.rs b/kb_lib/src/db/entities/token_burn_event.rs new file mode 100644 index 0000000..c797898 --- /dev/null +++ b/kb_lib/src/db/entities/token_burn_event.rs @@ -0,0 +1,28 @@ +// file: kb_lib/src/db/entities/token_burn_event.rs + +//! Token burn event entity. + +/// Persisted normalized token burn event row. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)] +pub struct KbTokenBurnEventEntity { + /// Numeric primary key. + pub id: i64, + /// Related token id. + pub token_id: i64, + /// Transaction signature. + pub signature: std::string::String, + /// Instruction index inside the transaction. + pub instruction_index: i64, + /// Optional slot number. + pub slot: std::option::Option, + /// Optional authority wallet. + pub authority_wallet: std::option::Option, + /// Optional source wallet. + pub source_wallet: std::option::Option, + /// Burned amount as decimal text. + pub amount: std::string::String, + /// Optional supply after burn as decimal text. + pub supply_after: std::option::Option, + /// Execution timestamp encoded as RFC3339 UTC text. + pub executed_at: std::string::String, +} diff --git a/kb_lib/src/db/entities/token_mint_event.rs b/kb_lib/src/db/entities/token_mint_event.rs new file mode 100644 index 0000000..cfda6bb --- /dev/null +++ b/kb_lib/src/db/entities/token_mint_event.rs @@ -0,0 +1,28 @@ +// file: kb_lib/src/db/entities/token_mint_event.rs + +//! Token mint event entity. + +/// Persisted normalized token mint event row. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)] +pub struct KbTokenMintEventEntity { + /// Numeric primary key. + pub id: i64, + /// Related token id. + pub token_id: i64, + /// Transaction signature. + pub signature: std::string::String, + /// Instruction index inside the transaction. + pub instruction_index: i64, + /// Optional slot number. + pub slot: std::option::Option, + /// Optional mint authority wallet. + pub authority_wallet: std::option::Option, + /// Optional destination wallet. + pub destination_wallet: std::option::Option, + /// Minted amount as decimal text. + pub amount: std::string::String, + /// Optional supply after mint as decimal text. + pub supply_after: std::option::Option, + /// Execution timestamp encoded as RFC3339 UTC text. + pub executed_at: std::string::String, +} diff --git a/kb_lib/src/db/queries.rs b/kb_lib/src/db/queries.rs index 7f38005..a8909dc 100644 --- a/kb_lib/src/db/queries.rs +++ b/kb_lib/src/db/queries.rs @@ -8,13 +8,17 @@ mod db_runtime_event; mod dex; mod known_http_endpoint; mod known_ws_endpoint; +mod liquidity_event; mod observed_token; mod onchain_observation; mod pair; mod pool; mod pool_listing; mod pool_token; +mod swap; mod token; +mod token_burn_event; +mod token_mint_event; pub use crate::db::queries::analysis_signal::insert_analysis_signal; pub use crate::db::queries::analysis_signal::list_recent_analysis_signals; @@ -31,6 +35,8 @@ pub use crate::db::queries::known_http_endpoint::upsert_known_http_endpoint; pub use crate::db::queries::known_ws_endpoint::get_known_ws_endpoint; pub use crate::db::queries::known_ws_endpoint::list_known_ws_endpoints; pub use crate::db::queries::known_ws_endpoint::upsert_known_ws_endpoint; +pub use crate::db::queries::liquidity_event::list_recent_liquidity_events; +pub use crate::db::queries::liquidity_event::upsert_liquidity_event; pub use crate::db::queries::observed_token::get_observed_token_by_mint; pub use crate::db::queries::observed_token::list_observed_tokens; pub use crate::db::queries::observed_token::upsert_observed_token; @@ -40,5 +46,11 @@ pub use crate::db::queries::pair::upsert_pair; pub use crate::db::queries::pool::upsert_pool; pub use crate::db::queries::pool_listing::upsert_pool_listing; pub use crate::db::queries::pool_token::upsert_pool_token; +pub use crate::db::queries::swap::list_recent_swaps; +pub use crate::db::queries::swap::upsert_swap; pub use crate::db::queries::token::get_token_by_mint; pub use crate::db::queries::token::upsert_token; +pub use crate::db::queries::token_burn_event::list_recent_token_burn_events; +pub use crate::db::queries::token_burn_event::upsert_token_burn_event; +pub use crate::db::queries::token_mint_event::list_recent_token_mint_events; +pub use crate::db::queries::token_mint_event::upsert_token_mint_event; diff --git a/kb_lib/src/db/queries/liquidity_event.rs b/kb_lib/src/db/queries/liquidity_event.rs new file mode 100644 index 0000000..bf45239 --- /dev/null +++ b/kb_lib/src/db/queries/liquidity_event.rs @@ -0,0 +1,167 @@ +// file: kb_lib/src/db/queries/liquidity_event.rs + +//! Queries for `kb_liquidity_events`. + +/// Inserts or updates one normalized liquidity event row. +pub async fn upsert_liquidity_event( + database: &crate::KbDatabase, + dto: &crate::KbLiquidityEventDto, +) -> Result { + let slot_i64 = match dto.slot { + Some(slot) => { + let slot_result = i64::try_from(slot); + match slot_result { + Ok(slot) => Some(slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert liquidity event slot '{}' to i64: {}", + slot, error + ))); + } + } + } + None => None, + }; + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query( + r#" +INSERT INTO kb_liquidity_events ( + dex_id, + pool_id, + pair_id, + signature, + instruction_index, + slot, + event_kind, + actor_wallet, + base_token_id, + quote_token_id, + lp_token_id, + base_amount, + quote_amount, + lp_amount, + executed_at +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(signature, instruction_index) DO UPDATE SET + dex_id = excluded.dex_id, + pool_id = excluded.pool_id, + pair_id = excluded.pair_id, + slot = excluded.slot, + event_kind = excluded.event_kind, + actor_wallet = excluded.actor_wallet, + base_token_id = excluded.base_token_id, + quote_token_id = excluded.quote_token_id, + lp_token_id = excluded.lp_token_id, + base_amount = excluded.base_amount, + quote_amount = excluded.quote_amount, + lp_amount = excluded.lp_amount, + executed_at = excluded.executed_at + "#, + ) + .bind(dto.dex_id) + .bind(dto.pool_id) + .bind(dto.pair_id) + .bind(dto.signature.clone()) + .bind(dto.instruction_index) + .bind(slot_i64) + .bind(dto.event_kind.to_i16()) + .bind(dto.actor_wallet.clone()) + .bind(dto.base_token_id) + .bind(dto.quote_token_id) + .bind(dto.lp_token_id) + .bind(dto.base_amount.clone()) + .bind(dto.quote_amount.clone()) + .bind(dto.lp_amount.clone()) + .bind(dto.executed_at.to_rfc3339()) + .execute(pool) + .await; + if let Err(error) = query_result { + return Err(crate::KbError::Db(format!( + "cannot upsert kb_liquidity_events on sqlite: {}", + error + ))); + } + let id_result = sqlx::query_scalar::( + r#" +SELECT id +FROM kb_liquidity_events +WHERE signature = ? AND instruction_index = ? +LIMIT 1 + "#, + ) + .bind(dto.signature.clone()) + .bind(dto.instruction_index) + .fetch_one(pool) + .await; + match id_result { + Ok(id) => Ok(id), + Err(error) => Err(crate::KbError::Db(format!( + "cannot fetch kb_liquidity_events id for signature '{}' and instruction_index '{}' on sqlite: {}", + dto.signature, dto.instruction_index, error + ))), + } + } + } +} + +/// Lists recent liquidity events ordered from newest to oldest. +pub async fn list_recent_liquidity_events( + database: &crate::KbDatabase, + limit: u32, +) -> Result, crate::KbError> { + if limit == 0 { + return Ok(std::vec::Vec::new()); + } + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + dex_id, + pool_id, + pair_id, + signature, + instruction_index, + slot, + event_kind, + actor_wallet, + base_token_id, + quote_token_id, + lp_token_id, + base_amount, + quote_amount, + lp_amount, + executed_at +FROM kb_liquidity_events +ORDER BY id DESC +LIMIT ? + "#, + ) + .bind(i64::from(limit)) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_liquidity_events on sqlite: {}", + error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbLiquidityEventDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} diff --git a/kb_lib/src/db/queries/swap.rs b/kb_lib/src/db/queries/swap.rs new file mode 100644 index 0000000..ca15847 --- /dev/null +++ b/kb_lib/src/db/queries/swap.rs @@ -0,0 +1,163 @@ +// file: kb_lib/src/db/queries/swap.rs + +//! Queries for `kb_swaps`. + +/// Inserts or updates one normalized swap row. +pub async fn upsert_swap( + database: &crate::KbDatabase, + dto: &crate::KbSwapDto, +) -> Result { + let slot_i64 = match dto.slot { + Some(slot) => { + let slot_result = i64::try_from(slot); + match slot_result { + Ok(slot) => Some(slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert swap slot '{}' to i64: {}", + slot, error + ))); + } + } + } + None => None, + }; + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query( + r#" +INSERT INTO kb_swaps ( + dex_id, + pool_id, + pair_id, + signature, + instruction_index, + slot, + trader_wallet, + base_token_id, + quote_token_id, + base_amount, + quote_amount, + price_quote, + trade_side, + executed_at +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(signature, instruction_index) DO UPDATE SET + dex_id = excluded.dex_id, + pool_id = excluded.pool_id, + pair_id = excluded.pair_id, + slot = excluded.slot, + trader_wallet = excluded.trader_wallet, + base_token_id = excluded.base_token_id, + quote_token_id = excluded.quote_token_id, + base_amount = excluded.base_amount, + quote_amount = excluded.quote_amount, + price_quote = excluded.price_quote, + trade_side = excluded.trade_side, + executed_at = excluded.executed_at + "#, + ) + .bind(dto.dex_id) + .bind(dto.pool_id) + .bind(dto.pair_id) + .bind(dto.signature.clone()) + .bind(dto.instruction_index) + .bind(slot_i64) + .bind(dto.trader_wallet.clone()) + .bind(dto.base_token_id) + .bind(dto.quote_token_id) + .bind(dto.base_amount.clone()) + .bind(dto.quote_amount.clone()) + .bind(dto.price_quote.clone()) + .bind(dto.trade_side.to_i16()) + .bind(dto.executed_at.to_rfc3339()) + .execute(pool) + .await; + if let Err(error) = query_result { + return Err(crate::KbError::Db(format!( + "cannot upsert kb_swaps on sqlite: {}", + error + ))); + } + let id_result = sqlx::query_scalar::( + r#" +SELECT id +FROM kb_swaps +WHERE signature = ? AND instruction_index = ? +LIMIT 1 + "#, + ) + .bind(dto.signature.clone()) + .bind(dto.instruction_index) + .fetch_one(pool) + .await; + match id_result { + Ok(id) => Ok(id), + Err(error) => Err(crate::KbError::Db(format!( + "cannot fetch kb_swaps id for signature '{}' and instruction_index '{}' on sqlite: {}", + dto.signature, dto.instruction_index, error + ))), + } + } + } +} + +/// Lists recent swaps ordered from newest to oldest. +pub async fn list_recent_swaps( + database: &crate::KbDatabase, + limit: u32, +) -> Result, crate::KbError> { + if limit == 0 { + return Ok(std::vec::Vec::new()); + } + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + dex_id, + pool_id, + pair_id, + signature, + instruction_index, + slot, + trader_wallet, + base_token_id, + quote_token_id, + base_amount, + quote_amount, + price_quote, + trade_side, + executed_at +FROM kb_swaps +ORDER BY id DESC +LIMIT ? + "#, + ) + .bind(i64::from(limit)) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_swaps on sqlite: {}", + error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbSwapDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} diff --git a/kb_lib/src/db/queries/token_burn_event.rs b/kb_lib/src/db/queries/token_burn_event.rs new file mode 100644 index 0000000..f8ed06b --- /dev/null +++ b/kb_lib/src/db/queries/token_burn_event.rs @@ -0,0 +1,319 @@ +// file: kb_lib/src/db/queries/token_burn_event.rs + +//! Queries for `kb_token_burn_events`. + +/// Inserts or updates one normalized token burn event row. +pub async fn upsert_token_burn_event( + database: &crate::KbDatabase, + dto: &crate::KbTokenBurnEventDto, +) -> Result { + let slot_i64 = match dto.slot { + Some(slot) => { + let slot_result = i64::try_from(slot); + match slot_result { + Ok(slot) => Some(slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert token burn event slot '{}' to i64: {}", + slot, error + ))); + } + } + } + None => None, + }; + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query( + r#" +INSERT INTO kb_token_burn_events ( + token_id, + signature, + instruction_index, + slot, + authority_wallet, + source_wallet, + amount, + supply_after, + executed_at +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(signature, instruction_index) DO UPDATE SET + token_id = excluded.token_id, + slot = excluded.slot, + authority_wallet = excluded.authority_wallet, + source_wallet = excluded.source_wallet, + amount = excluded.amount, + supply_after = excluded.supply_after, + executed_at = excluded.executed_at + "#, + ) + .bind(dto.token_id) + .bind(dto.signature.clone()) + .bind(dto.instruction_index) + .bind(slot_i64) + .bind(dto.authority_wallet.clone()) + .bind(dto.source_wallet.clone()) + .bind(dto.amount.clone()) + .bind(dto.supply_after.clone()) + .bind(dto.executed_at.to_rfc3339()) + .execute(pool) + .await; + if let Err(error) = query_result { + return Err(crate::KbError::Db(format!( + "cannot upsert kb_token_burn_events on sqlite: {}", + error + ))); + } + let id_result = sqlx::query_scalar::( + r#" +SELECT id +FROM kb_token_burn_events +WHERE signature = ? AND instruction_index = ? +LIMIT 1 + "#, + ) + .bind(dto.signature.clone()) + .bind(dto.instruction_index) + .fetch_one(pool) + .await; + match id_result { + Ok(id) => Ok(id), + Err(error) => Err(crate::KbError::Db(format!( + "cannot fetch kb_token_burn_events id for signature '{}' and instruction_index '{}' on sqlite: {}", + dto.signature, dto.instruction_index, error + ))), + } + } + } +} + +/// Lists recent token burn events ordered from newest to oldest. +pub async fn list_recent_token_burn_events( + database: &crate::KbDatabase, + limit: u32, +) -> Result, crate::KbError> { + if limit == 0 { + return Ok(std::vec::Vec::new()); + } + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + token_id, + signature, + instruction_index, + slot, + authority_wallet, + source_wallet, + amount, + supply_after, + executed_at +FROM kb_token_burn_events +ORDER BY id DESC +LIMIT ? + "#, + ) + .bind(i64::from(limit)) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_token_burn_events on sqlite: {}", + error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbTokenBurnEventDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn normalized_activity_roundtrip_works() { + let tempdir = tempfile::tempdir().expect("tempdir must succeed"); + let database_path = tempdir.path().join("normalized_activity.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database = crate::KbDatabase::connect_and_initialize(&config) + .await + .expect("database init must succeed"); + let dex_id = crate::upsert_dex( + &database, + &crate::KbDexDto::new( + "raydium".to_string(), + "Raydium".to_string(), + None, + None, + true, + ), + ) + .await + .expect("dex upsert must succeed"); + let base_token_id = crate::upsert_token( + &database, + &crate::KbTokenDto::new( + "Base111111111111111111111111111111111111111".to_string(), + Some("BASE".to_string()), + Some("Base Token".to_string()), + Some(6), + crate::SPL_TOKEN_PROGRAM_ID.to_string(), + false, + ), + ) + .await + .expect("base token upsert must succeed"); + let quote_token_id = crate::upsert_token( + &database, + &crate::KbTokenDto::new( + "So11111111111111111111111111111111111111112".to_string(), + Some("WSOL".to_string()), + Some("Wrapped SOL".to_string()), + Some(9), + crate::SPL_TOKEN_PROGRAM_ID.to_string(), + true, + ), + ) + .await + .expect("quote token upsert must succeed"); + let pool_id = crate::upsert_pool( + &database, + &crate::KbPoolDto::new( + dex_id, + "Pool111111111111111111111111111111111111111".to_string(), + crate::KbPoolKind::Amm, + crate::KbPoolStatus::Active, + ), + ) + .await + .expect("pool upsert must succeed"); + let pair_id = crate::upsert_pair( + &database, + &crate::KbPairDto::new( + dex_id, + pool_id, + base_token_id, + quote_token_id, + Some("BASE/WSOL".to_string()), + ), + ) + .await + .expect("pair upsert must succeed"); + let swap_id = crate::upsert_swap( + &database, + &crate::KbSwapDto::new( + dex_id, + pool_id, + Some(pair_id), + "swap-signature-1".to_string(), + 0, + Some(1000), + Some("Trader11111111111111111111111111111111111".to_string()), + base_token_id, + quote_token_id, + "1000.50".to_string(), + "2.75".to_string(), + Some("0.002748625687156422".to_string()), + crate::KbSwapTradeSide::BuyBase, + ), + ) + .await + .expect("swap upsert must succeed"); + let liquidity_id = crate::upsert_liquidity_event( + &database, + &crate::KbLiquidityEventDto::new( + dex_id, + pool_id, + Some(pair_id), + "liq-signature-1".to_string(), + 1, + Some(1001), + crate::KbLiquidityEventKind::Add, + Some("LpUser1111111111111111111111111111111111".to_string()), + base_token_id, + quote_token_id, + None, + "5000".to_string(), + "15".to_string(), + None, + ), + ) + .await + .expect("liquidity event upsert must succeed"); + let mint_id = crate::upsert_token_mint_event( + &database, + &crate::KbTokenMintEventDto::new( + base_token_id, + "mint-signature-1".to_string(), + 2, + Some(1002), + Some("MintAuthority111111111111111111111111111".to_string()), + Some("Dest1111111111111111111111111111111111111".to_string()), + "1000000".to_string(), + Some("5000000".to_string()), + ), + ) + .await + .expect("token mint event upsert must succeed"); + let burn_id = crate::upsert_token_burn_event( + &database, + &crate::KbTokenBurnEventDto::new( + base_token_id, + "burn-signature-1".to_string(), + 3, + Some(1003), + Some("BurnAuthority111111111111111111111111111".to_string()), + Some("Source11111111111111111111111111111111111".to_string()), + "25000".to_string(), + Some("4975000".to_string()), + ), + ) + .await + .expect("token burn event upsert must succeed"); + assert!(swap_id > 0); + assert!(liquidity_id > 0); + assert!(mint_id > 0); + assert!(burn_id > 0); + let swaps = crate::list_recent_swaps(&database, 10) + .await + .expect("swaps list must succeed"); + let liquidity_events = crate::list_recent_liquidity_events(&database, 10) + .await + .expect("liquidity list must succeed"); + let mint_events = crate::list_recent_token_mint_events(&database, 10) + .await + .expect("mint events list must succeed"); + let burn_events = crate::list_recent_token_burn_events(&database, 10) + .await + .expect("burn events list must succeed"); + assert_eq!(swaps.len(), 1); + assert_eq!(liquidity_events.len(), 1); + assert_eq!(mint_events.len(), 1); + assert_eq!(burn_events.len(), 1); + } +} diff --git a/kb_lib/src/db/queries/token_mint_event.rs b/kb_lib/src/db/queries/token_mint_event.rs new file mode 100644 index 0000000..bd50158 --- /dev/null +++ b/kb_lib/src/db/queries/token_mint_event.rs @@ -0,0 +1,143 @@ +// file: kb_lib/src/db/queries/token_mint_event.rs + +//! Queries for `kb_token_mint_events`. + +/// Inserts or updates one normalized token mint event row. +pub async fn upsert_token_mint_event( + database: &crate::KbDatabase, + dto: &crate::KbTokenMintEventDto, +) -> Result { + let slot_i64 = match dto.slot { + Some(slot) => { + let slot_result = i64::try_from(slot); + match slot_result { + Ok(slot) => Some(slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert token mint event slot '{}' to i64: {}", + slot, error + ))); + } + } + } + None => None, + }; + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query( + r#" +INSERT INTO kb_token_mint_events ( + token_id, + signature, + instruction_index, + slot, + authority_wallet, + destination_wallet, + amount, + supply_after, + executed_at +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(signature, instruction_index) DO UPDATE SET + token_id = excluded.token_id, + slot = excluded.slot, + authority_wallet = excluded.authority_wallet, + destination_wallet = excluded.destination_wallet, + amount = excluded.amount, + supply_after = excluded.supply_after, + executed_at = excluded.executed_at + "#, + ) + .bind(dto.token_id) + .bind(dto.signature.clone()) + .bind(dto.instruction_index) + .bind(slot_i64) + .bind(dto.authority_wallet.clone()) + .bind(dto.destination_wallet.clone()) + .bind(dto.amount.clone()) + .bind(dto.supply_after.clone()) + .bind(dto.executed_at.to_rfc3339()) + .execute(pool) + .await; + if let Err(error) = query_result { + return Err(crate::KbError::Db(format!( + "cannot upsert kb_token_mint_events on sqlite: {}", + error + ))); + } + let id_result = sqlx::query_scalar::( + r#" +SELECT id +FROM kb_token_mint_events +WHERE signature = ? AND instruction_index = ? +LIMIT 1 + "#, + ) + .bind(dto.signature.clone()) + .bind(dto.instruction_index) + .fetch_one(pool) + .await; + match id_result { + Ok(id) => Ok(id), + Err(error) => Err(crate::KbError::Db(format!( + "cannot fetch kb_token_mint_events id for signature '{}' and instruction_index '{}' on sqlite: {}", + dto.signature, dto.instruction_index, error + ))), + } + } + } +} + +/// Lists recent token mint events ordered from newest to oldest. +pub async fn list_recent_token_mint_events( + database: &crate::KbDatabase, + limit: u32, +) -> Result, crate::KbError> { + if limit == 0 { + return Ok(std::vec::Vec::new()); + } + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + token_id, + signature, + instruction_index, + slot, + authority_wallet, + destination_wallet, + amount, + supply_after, + executed_at +FROM kb_token_mint_events +ORDER BY id DESC +LIMIT ? + "#, + ) + .bind(i64::from(limit)) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_token_mint_events on sqlite: {}", + error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbTokenMintEventDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} diff --git a/kb_lib/src/db/schema.rs b/kb_lib/src/db/schema.rs index 93a7b5b..e640b3f 100644 --- a/kb_lib/src/db/schema.rs +++ b/kb_lib/src/db/schema.rs @@ -148,7 +148,7 @@ CREATE TABLE IF NOT EXISTS kb_pools ( first_seen_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(dex_id) REFERENCES kb_dexes(id) - ) +) "#, r#" CREATE INDEX IF NOT EXISTS kb_idx_pools_dex_id @@ -168,7 +168,7 @@ CREATE TABLE IF NOT EXISTS kb_pairs ( FOREIGN KEY(pool_id) REFERENCES kb_pools(id), FOREIGN KEY(base_token_id) REFERENCES kb_tokens(id), FOREIGN KEY(quote_token_id) REFERENCES kb_tokens(id) - ) +) "#, r#" CREATE TABLE IF NOT EXISTS kb_pool_tokens ( @@ -183,7 +183,7 @@ CREATE TABLE IF NOT EXISTS kb_pool_tokens ( FOREIGN KEY(pool_id) REFERENCES kb_pools(id), FOREIGN KEY(token_id) REFERENCES kb_tokens(id), UNIQUE(pool_id, token_id, role) - ) +) "#, r#" CREATE TABLE IF NOT EXISTS kb_pool_listings ( @@ -201,12 +201,128 @@ CREATE TABLE IF NOT EXISTS kb_pool_listings ( FOREIGN KEY(dex_id) REFERENCES kb_dexes(id), FOREIGN KEY(pool_id) REFERENCES kb_pools(id), FOREIGN KEY(pair_id) REFERENCES kb_pairs(id) - ) +) "#, r#" CREATE INDEX IF NOT EXISTS kb_idx_pool_listings_detected_at ON kb_pool_listings (detected_at) "#, + r#" +CREATE TABLE IF NOT EXISTS kb_swaps ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + dex_id INTEGER NOT NULL, + pool_id INTEGER NOT NULL, + pair_id INTEGER NULL, + signature TEXT NOT NULL, + instruction_index INTEGER NOT NULL, + slot INTEGER NULL, + trader_wallet TEXT NULL, + base_token_id INTEGER NOT NULL, + quote_token_id INTEGER NOT NULL, + base_amount TEXT NOT NULL, + quote_amount TEXT NOT NULL, + price_quote TEXT NULL, + trade_side INTEGER NOT NULL, + executed_at TEXT NOT NULL, + FOREIGN KEY(dex_id) REFERENCES kb_dexes(id), + FOREIGN KEY(pool_id) REFERENCES kb_pools(id), + FOREIGN KEY(pair_id) REFERENCES kb_pairs(id), + FOREIGN KEY(base_token_id) REFERENCES kb_tokens(id), + FOREIGN KEY(quote_token_id) REFERENCES kb_tokens(id), + UNIQUE(signature, instruction_index) +) + "#, + r#" +CREATE INDEX IF NOT EXISTS kb_idx_swaps_pool_id +ON kb_swaps (pool_id) + "#, + r#" +CREATE INDEX IF NOT EXISTS kb_idx_swaps_executed_at +ON kb_swaps (executed_at) + "#, + r#" +CREATE TABLE IF NOT EXISTS kb_liquidity_events ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + dex_id INTEGER NOT NULL, + pool_id INTEGER NOT NULL, + pair_id INTEGER NULL, + signature TEXT NOT NULL, + instruction_index INTEGER NOT NULL, + slot INTEGER NULL, + event_kind INTEGER NOT NULL, + actor_wallet TEXT NULL, + base_token_id INTEGER NOT NULL, + quote_token_id INTEGER NOT NULL, + lp_token_id INTEGER NULL, + base_amount TEXT NOT NULL, + quote_amount TEXT NOT NULL, + lp_amount TEXT NULL, + executed_at TEXT NOT NULL, + FOREIGN KEY(dex_id) REFERENCES kb_dexes(id), + FOREIGN KEY(pool_id) REFERENCES kb_pools(id), + FOREIGN KEY(pair_id) REFERENCES kb_pairs(id), + FOREIGN KEY(base_token_id) REFERENCES kb_tokens(id), + FOREIGN KEY(quote_token_id) REFERENCES kb_tokens(id), + FOREIGN KEY(lp_token_id) REFERENCES kb_tokens(id), + UNIQUE(signature, instruction_index) +) + "#, + r#" +CREATE INDEX IF NOT EXISTS kb_idx_liquidity_events_pool_id +ON kb_liquidity_events (pool_id) + "#, + r#" +CREATE INDEX IF NOT EXISTS kb_idx_liquidity_events_executed_at +ON kb_liquidity_events (executed_at) + "#, + r#" +CREATE TABLE IF NOT EXISTS kb_token_mint_events ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + token_id INTEGER NOT NULL, + signature TEXT NOT NULL, + instruction_index INTEGER NOT NULL, + slot INTEGER NULL, + authority_wallet TEXT NULL, + destination_wallet TEXT NULL, + amount TEXT NOT NULL, + supply_after TEXT NULL, + executed_at TEXT NOT NULL, + FOREIGN KEY(token_id) REFERENCES kb_tokens(id), + UNIQUE(signature, instruction_index) +) + "#, + r#" +CREATE INDEX IF NOT EXISTS kb_idx_token_mint_events_token_id +ON kb_token_mint_events (token_id) + "#, + r#" +CREATE INDEX IF NOT EXISTS kb_idx_token_mint_events_executed_at +ON kb_token_mint_events (executed_at) + "#, + r#" +CREATE TABLE IF NOT EXISTS kb_token_burn_events ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + token_id INTEGER NOT NULL, + signature TEXT NOT NULL, + instruction_index INTEGER NOT NULL, + slot INTEGER NULL, + authority_wallet TEXT NULL, + source_wallet TEXT NULL, + amount TEXT NOT NULL, + supply_after TEXT NULL, + executed_at TEXT NOT NULL, + FOREIGN KEY(token_id) REFERENCES kb_tokens(id), + UNIQUE(signature, instruction_index) +) + "#, + r#" +CREATE INDEX IF NOT EXISTS kb_idx_token_burn_events_token_id +ON kb_token_burn_events (token_id) + "#, + r#" +CREATE INDEX IF NOT EXISTS kb_idx_token_burn_events_executed_at +ON kb_token_burn_events (executed_at) + "#, ]; for statement in statements { let execute_result = sqlx::query(statement).execute(pool).await; diff --git a/kb_lib/src/db/types.rs b/kb_lib/src/db/types.rs index 727e295..a429406 100644 --- a/kb_lib/src/db/types.rs +++ b/kb_lib/src/db/types.rs @@ -4,18 +4,22 @@ mod analysis_signal_severity; mod database_backend; +mod liquidity_event_kind; mod observation_source_kind; mod observed_token_status; mod pool_kind; mod pool_status; mod pool_token_role; mod runtime_event_level; +mod swap_trade_side; pub use crate::db::types::analysis_signal_severity::KbAnalysisSignalSeverity; pub use crate::db::types::database_backend::KbDatabaseBackend; +pub use crate::db::types::liquidity_event_kind::KbLiquidityEventKind; pub use crate::db::types::observation_source_kind::KbObservationSourceKind; pub use crate::db::types::observed_token_status::KbObservedTokenStatus; pub use crate::db::types::pool_kind::KbPoolKind; pub use crate::db::types::pool_status::KbPoolStatus; pub use crate::db::types::pool_token_role::KbPoolTokenRole; pub use crate::db::types::runtime_event_level::KbDbRuntimeEventLevel; +pub use crate::db::types::swap_trade_side::KbSwapTradeSide; diff --git a/kb_lib/src/db/types/liquidity_event_kind.rs b/kb_lib/src/db/types/liquidity_event_kind.rs new file mode 100644 index 0000000..1671590 --- /dev/null +++ b/kb_lib/src/db/types/liquidity_event_kind.rs @@ -0,0 +1,34 @@ +// file: kb_lib/src/db/types/liquidity_event_kind.rs + +//! Liquidity event kind. + +/// Normalized liquidity event kind. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum KbLiquidityEventKind { + /// Liquidity addition. + Add, + /// Liquidity removal. + Remove, +} + +impl KbLiquidityEventKind { + /// Converts the event kind to its stable integer representation. + pub fn to_i16(self) -> i16 { + match self { + Self::Add => 0, + Self::Remove => 1, + } + } + + /// Restores an event kind from its stable integer representation. + pub fn from_i16(value: i16) -> Result { + match value { + 0 => Ok(Self::Add), + 1 => Ok(Self::Remove), + _ => Err(crate::KbError::Db(format!( + "invalid KbLiquidityEventKind value: {}", + value + ))), + } + } +} diff --git a/kb_lib/src/db/types/swap_trade_side.rs b/kb_lib/src/db/types/swap_trade_side.rs new file mode 100644 index 0000000..084dde6 --- /dev/null +++ b/kb_lib/src/db/types/swap_trade_side.rs @@ -0,0 +1,38 @@ +// file: kb_lib/src/db/types/swap_trade_side.rs + +//! Swap trade side. + +/// Swap side relative to the normalized base token of the pair. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum KbSwapTradeSide { + /// Unknown or not yet derived. + Unknown, + /// Buy of the base token against the quote token. + BuyBase, + /// Sell of the base token against the quote token. + SellBase, +} + +impl KbSwapTradeSide { + /// Converts the trade side to its stable integer representation. + pub fn to_i16(self) -> i16 { + match self { + Self::Unknown => 0, + Self::BuyBase => 1, + Self::SellBase => 2, + } + } + + /// Restores a trade side from its stable integer representation. + pub fn from_i16(value: i16) -> Result { + match value { + 0 => Ok(Self::Unknown), + 1 => Ok(Self::BuyBase), + 2 => Ok(Self::SellBase), + _ => Err(crate::KbError::Db(format!( + "invalid KbSwapTradeSide value: {}", + value + ))), + } + } +} diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index 6dcf665..aa8e66e 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -102,6 +102,24 @@ pub use crate::db::KbPoolTokenEntity; pub use crate::db::KbPoolTokenRole; pub use crate::db::KbTokenDto; pub use crate::db::KbTokenEntity; +pub use crate::db::KbLiquidityEventDto; +pub use crate::db::KbLiquidityEventEntity; +pub use crate::db::KbLiquidityEventKind; +pub use crate::db::KbSwapDto; +pub use crate::db::KbSwapEntity; +pub use crate::db::KbSwapTradeSide; +pub use crate::db::KbTokenBurnEventDto; +pub use crate::db::KbTokenBurnEventEntity; +pub use crate::db::KbTokenMintEventDto; +pub use crate::db::KbTokenMintEventEntity; +pub use crate::db::list_recent_liquidity_events; +pub use crate::db::list_recent_swaps; +pub use crate::db::list_recent_token_burn_events; +pub use crate::db::list_recent_token_mint_events; +pub use crate::db::upsert_liquidity_event; +pub use crate::db::upsert_swap; +pub use crate::db::upsert_token_burn_event; +pub use crate::db::upsert_token_mint_event; pub use crate::db::get_token_by_mint; pub use crate::db::list_dexes; pub use crate::db::upsert_dex;